Class: Etcd::Client
- Inherits:
-
Object
- Object
- Etcd::Client
- Includes:
- Constants, Loggable, Requestable
- Defined in:
- lib/etcd/client.rb,
lib/etcd/client/failover.rb,
lib/etcd/client/protocol.rb,
lib/etcd/client/observing.rb
Constant Summary
Constants included from Constants
Etcd::Constants::S_ACTION, Etcd::Constants::S_AND, Etcd::Constants::S_DIR, Etcd::Constants::S_EXPIRATION, Etcd::Constants::S_INDEX, Etcd::Constants::S_KEY, Etcd::Constants::S_KEYS, Etcd::Constants::S_LOCATION, Etcd::Constants::S_NODE, Etcd::Constants::S_NODES, Etcd::Constants::S_PREV_NODE, Etcd::Constants::S_SLASH, Etcd::Constants::S_TTL, Etcd::Constants::S_VALUE
Instance Attribute Summary collapse
-
#cluster ⇒ Object
Returns the value of attribute cluster.
-
#heartbeat_freq ⇒ Object
Returns the value of attribute heartbeat_freq.
-
#leader ⇒ Object
kinda magic accessor-method: - will reinitialize leader && cluster if needed.
-
#observers ⇒ Object
Returns the value of attribute observers.
-
#seed_uris ⇒ Object
Returns the value of attribute seed_uris.
-
#status ⇒ Object
:up/:down.
Class Method Summary collapse
-
.connect(options = {}) ⇒ Object
Create a new client and connect it to the etcd cluster.
Instance Method Summary collapse
-
#connect ⇒ Object
Connects to the etcd cluster.
-
#delete(key) ⇒ String
Remove a key and its value.
-
#exists?(key) ⇒ true, false
Returns true if the specified key exists.
-
#get(key) ⇒ String, Hash
Gets the value or values for a key.
-
#handle_redirected(uri, response) ⇒ Object
Only happens on attempted write to a follower node in cluster.
-
#info(key) ⇒ Hash
Returns info about a key, such as TTL, expiration and index.
-
#initialize(options = {}) ⇒ Client
constructor
A new instance of Client.
- #inspect ⇒ Object
- #key_uri(key) ⇒ Object
- #leader_uri ⇒ Object
-
#observe(prefix, options = {}, &handler) ⇒ #cancel, #join
Sets up a continuous watch of a key or prefix.
- #observers_overview ⇒ Object
-
#refresh_observers ⇒ Object
Re-initiates watches after leader election.
- #refresh_observers_if_needed ⇒ Object
-
#set(key, value, options = {}) ⇒ String
Sets the value of a key.
- #start_heartbeat_if_needed ⇒ Object
-
#update(key, value, expected_value, options = {}) ⇒ true, false
Atomically sets the value for a key if the current value for the key matches the specified expected value.
-
#update_cluster ⇒ Object
Creates a Cluster-instance from ‘@seed_uris` and stores the cluster leader information.
-
#watch(prefix, options = {}) {|value, key, info| ... } ⇒ Object
Watches a key or prefix and calls the given block when with any changes.
- #watch_uri(key) ⇒ Object
Methods included from Loggable
Methods included from Requestable
#http_client, #request, #reset_http_client!
Constructor Details
#initialize(options = {}) ⇒ Client
Returns a new instance of Client.
7 8 9 10 11 12 |
# File 'lib/etcd/client/failover.rb', line 7 def initialize(={}) @observers = {} @seed_uris = [:uris] || ['http://127.0.0.1:4001'] @heartbeat_freq = [:heartbeat_freq].to_f http_client.redirect_uri_callback = method(:handle_redirected) end |
Instance Attribute Details
#cluster ⇒ Object
Returns the value of attribute cluster.
9 10 11 |
# File 'lib/etcd/client.rb', line 9 def cluster @cluster end |
#heartbeat_freq ⇒ Object
Returns the value of attribute heartbeat_freq.
12 13 14 |
# File 'lib/etcd/client.rb', line 12 def heartbeat_freq @heartbeat_freq end |
#leader ⇒ Object
kinda magic accessor-method:
-
will reinitialize leader && cluster if needed
54 55 56 |
# File 'lib/etcd/client/failover.rb', line 54 def leader @leader end |
#observers ⇒ Object
Returns the value of attribute observers.
13 14 15 |
# File 'lib/etcd/client.rb', line 13 def observers @observers end |
#seed_uris ⇒ Object
Returns the value of attribute seed_uris.
11 12 13 |
# File 'lib/etcd/client.rb', line 11 def seed_uris @seed_uris end |
#status ⇒ Object
:up/:down
14 15 16 |
# File 'lib/etcd/client.rb', line 14 def status @status end |
Class Method Details
.connect(options = {}) ⇒ Object
Create a new client and connect it to the etcd cluster.
This method is the preferred way to create a new client, and is the equivalent of ‘Client.new(options).connect`. See #initialize and #connect for options and details.
22 23 24 |
# File 'lib/etcd/client/failover.rb', line 22 def self.connect(={}) self.new().connect end |
Instance Method Details
#connect ⇒ Object
Connects to the etcd cluster
29 30 31 32 33 |
# File 'lib/etcd/client/failover.rb', line 29 def connect update_cluster start_heartbeat_if_needed self end |
#delete(key) ⇒ String
Remove a key and its value.
The previous value is returned, or ‘nil` if the key did not exist.
80 81 82 83 84 |
# File 'lib/etcd/client/protocol.rb', line 80 def delete(key) data = request_data(:delete, key_uri(key)) return nil unless data data[S_PREV_NODE][S_VALUE] end |
#exists?(key) ⇒ true, false
Returns true if the specified key exists.
This is a convenience method and equivalent to calling #get and checking if the value is ‘nil`.
92 93 94 |
# File 'lib/etcd/client/protocol.rb', line 92 def exists?(key) !!get(key) end |
#get(key) ⇒ String, Hash
Gets the value or values for a key.
If the key represents a directory with direct decendants (e.g. “/foo” for “/foo/bar”) a hash of keys and values will be returned.
39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/etcd/client/protocol.rb', line 39 def get(key) data = request_data(:get, key_uri(key)) return nil unless data if nodes = data[S_NODE][S_NODES] nodes.each_with_object({}) do |node, acc| acc[node[S_KEY]] = node[S_VALUE] end else data[S_NODE][S_VALUE] end end |
#handle_redirected(uri, response) ⇒ Object
Only happens on attempted write to a follower node in cluster. Means:
-
leader changed since last update
Solution: just get fresh cluster status
71 72 73 74 |
# File 'lib/etcd/client/failover.rb', line 71 def handle_redirected(uri, response) update_cluster http_client.default_redirect_uri_callback(uri, response) end |
#info(key) ⇒ Hash
Returns info about a key, such as TTL, expiration and index.
For keys with values the returned hash will include ‘:key`, `:value` and `:index`. Additionally for keys with a TTL set there will be a `:ttl` and `:expiration` (as a UTC `Time`).
For keys that represent directories with no direct decendants (e.g. “/foo” for “/foo/bar/baz”) the ‘:dir` key will have the value `true`.
For keys that represent directories with direct decendants (e.g. “/foo” for “/foo/bar”) a hash of keys and info will be returned.
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/etcd/client/protocol.rb', line 111 def info(key) data = request_data(:get, uri(key)) return nil unless data if nodes = data[S_NODE][S_NODES] nodes.each_with_object({}) do |d, acc| info = extract_info(d) info.delete(:action) acc[info[:key]] = info end else info = extract_info(data) info.delete(:action) info end end |
#inspect ⇒ Object
16 17 18 |
# File 'lib/etcd/client.rb', line 16 def inspect %Q(<Etcd::Client #{seed_uris}>) end |
#key_uri(key) ⇒ Object
160 161 162 |
# File 'lib/etcd/client/protocol.rb', line 160 def key_uri(key) uri(key, S_KEYS) end |
#leader_uri ⇒ Object
58 59 60 |
# File 'lib/etcd/client/failover.rb', line 58 def leader_uri leader && leader.client_urls && leader.client_urls.first end |
#observe(prefix, options = {}, &handler) ⇒ #cancel, #join
Sets up a continuous watch of a key or prefix.
This method works like #watch (which is used behind the scenes), but will re-watch the key or prefix after receiving a change notificiation.
When re-watching the index of the previous change notification is used, so no subsequent changes will be lost while a change is being processed.
Unlike #watch this method as asynchronous. The watch handler runs in a separate thread (currently a new thread is created for each invocation, keep this in mind if you need to watch many different keys), and can be cancelled by calling ‘#cancel` on the returned object.
Because of implementation details the watch handler thread will not be stopped directly when you call ‘#cancel`. The thread will be blocked until the next change notification (which will be ignored). This will have very little effect on performance since the thread will not be runnable. Unless you’re creating lots of observers it should not matter. If you want to make sure you wait for the thread to stop you can call ‘#join` on the returned object.
34 35 36 37 38 |
# File 'lib/etcd/client/observing.rb', line 34 def observe(prefix, = {}, &handler) ob = Observer.new(self, prefix, handler, ).tap(&:run) @observers[prefix] = ob ob end |
#observers_overview ⇒ Object
40 41 42 43 44 |
# File 'lib/etcd/client/observing.rb', line 40 def observers_overview observers.map do |_, observer| observer.pp_status end end |
#refresh_observers ⇒ Object
Re-initiates watches after leader election
52 53 54 55 56 57 |
# File 'lib/etcd/client/observing.rb', line 52 def refresh_observers logger.debug("refresh_observers: enter") observers.each do |_, observer| observer.rerun unless observer.status end end |
#refresh_observers_if_needed ⇒ Object
46 47 48 |
# File 'lib/etcd/client/observing.rb', line 46 def refresh_observers_if_needed refresh_observers if observers.values.any?{|x| not x.status} end |
#set(key, value, options = {}) ⇒ String
Sets the value of a key.
Accepts an optional ‘:ttl` which is the number of seconds that the key should live before being automatically deleted.
24 25 26 27 28 29 |
# File 'lib/etcd/client/protocol.rb', line 24 def set(key, value, ={}) body = {:value => value} body[:ttl] = [:ttl] if [:ttl] data = request_data(:PUT, key_uri(key), body: body) data[S_PREV_NODE][S_VALUE] if data && data[S_PREV_NODE] end |
#start_heartbeat_if_needed ⇒ Object
62 63 64 65 66 |
# File 'lib/etcd/client/failover.rb', line 62 def start_heartbeat_if_needed logger.debug("client - starting heartbeat") @heartbeat = Etcd::Heartbeat.new(self, @heartbeat_freq) @heartbeat.start_heartbeat_if_needed end |
#update(key, value, expected_value, options = {}) ⇒ true, false
Atomically sets the value for a key if the current value for the key matches the specified expected value.
Returns ‘true` when the operation succeeds, i.e. when the specified expected value matches the current value. Returns `false` otherwise.
Accepts an optional ‘:ttl` which is the number of seconds that the key should live before being automatically deleted.
67 68 69 70 71 72 |
# File 'lib/etcd/client/protocol.rb', line 67 def update(key, value, expected_value, ={}) body = {:value => value, :prevValue => expected_value} body[:ttl] = [:ttl] if [:ttl] data = request_data(:put, key_uri(key), body: body) !! data end |
#update_cluster ⇒ Object
Creates a Cluster-instance from ‘@seed_uris` and stores the cluster leader information
37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/etcd/client/failover.rb', line 37 def update_cluster logger.debug("update_cluster: enter") begin @cluster = Etcd::Cluster.init_from_uris(*seed_uris) @leader = @cluster.leader @status = :up logger.debug("update_cluster: after success") refresh_observers @cluster rescue AllNodesDownError => e logger.debug("update_cluster: failed") raise e end end |
#watch(prefix, options = {}) {|value, key, info| ... } ⇒ Object
Watches a key or prefix and calls the given block when with any changes.
This method will block until the server replies. There is no way to cancel the call.
The parameters to the block are the value, the key and a hash of additional info. The info will contain the ‘:action` that caused the change (`:set`, `:delete` etc.), the `:key`, the `:value`, the `:index`, `:new_key` with the value `true` when a new key was created below the watched prefix, `:previous_value`, if any, `:ttl` and `:expiration` if applicable.
The reason why the block parameters are in the order`value`, ‘key` instead of `key`, `value` is because you almost always want to get the new value when you watch, but not always the key, and most often not the info. With this order you can leave out the parameters you don’t need.
151 152 153 154 155 156 157 |
# File 'lib/etcd/client/protocol.rb', line 151 def watch(prefix, ={}) .merge!(wait: 'true') .delete(:index) if .has_key?(:index) && [:index].nil? data = request_data(:get, watch_uri(prefix), query: ) info = extract_info(data) yield info[:value], info[:key], info end |
#watch_uri(key) ⇒ Object
164 165 166 |
# File 'lib/etcd/client/protocol.rb', line 164 def watch_uri(key) uri(key, S_KEYS) end |