Class: Etcd::Client

Inherits:
Object
  • Object
show all
Includes:
Constants, Loggable, Requestable
Defined in:
lib/etcd/client.rb,
lib/etcd/client/failover.rb,
lib/etcd/client/protocol.rb,
lib/etcd/client/observing.rb

Constant Summary

Constants included from Constants

Etcd::Constants::S_ACTION, Etcd::Constants::S_AND, Etcd::Constants::S_DIR, Etcd::Constants::S_EXPIRATION, Etcd::Constants::S_INDEX, Etcd::Constants::S_KEY, Etcd::Constants::S_KEYS, Etcd::Constants::S_LOCATION, Etcd::Constants::S_NODE, Etcd::Constants::S_NODES, Etcd::Constants::S_PREV_NODE, Etcd::Constants::S_SLASH, Etcd::Constants::S_TTL, Etcd::Constants::S_VALUE

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Loggable

#logger, #reset_logger!

Methods included from Requestable

#http_client, #request, #reset_http_client!

Constructor Details

#initialize(options = {}) ⇒ Client

Returns a new instance of Client.

Parameters:

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :uris (Array) — default: ['http://127.0.0.1:4001']

    seed uris with etcd cluster nodes

  • :heartbeat_freq (Float) — default: 0.0

    check-frequency for leader status (in seconds) Heartbeating will start only for non-zero values



7
8
9
10
11
12
# File 'lib/etcd/client/failover.rb', line 7

def initialize(options={})
  @observers      = {}
  @seed_uris      = options[:uris] || ['http://127.0.0.1:4001']
  @heartbeat_freq = options[:heartbeat_freq].to_f
  http_client.redirect_uri_callback = method(:handle_redirected)
end

Instance Attribute Details

#clusterObject

Returns the value of attribute cluster.



9
10
11
# File 'lib/etcd/client.rb', line 9

def cluster
  @cluster
end

#heartbeat_freqObject

Returns the value of attribute heartbeat_freq.



12
13
14
# File 'lib/etcd/client.rb', line 12

def heartbeat_freq
  @heartbeat_freq
end

#leaderObject

kinda magic accessor-method:

  • will reinitialize leader && cluster if needed



54
55
56
# File 'lib/etcd/client/failover.rb', line 54

def leader
  @leader
end

#observersObject

Returns the value of attribute observers.



13
14
15
# File 'lib/etcd/client.rb', line 13

def observers
  @observers
end

#seed_urisObject

Returns the value of attribute seed_uris.



11
12
13
# File 'lib/etcd/client.rb', line 11

def seed_uris
  @seed_uris
end

#statusObject

:up/:down



14
15
16
# File 'lib/etcd/client.rb', line 14

def status
  @status
end

Class Method Details

.connect(options = {}) ⇒ Object

Create a new client and connect it to the etcd cluster.

This method is the preferred way to create a new client, and is the equivalent of ‘Client.new(options).connect`. See #initialize and #connect for options and details.

See Also:



22
23
24
# File 'lib/etcd/client/failover.rb', line 22

def self.connect(options={})
  self.new(options).connect
end

Instance Method Details

#connectObject

Connects to the etcd cluster

See Also:



29
30
31
32
33
# File 'lib/etcd/client/failover.rb', line 29

def connect
  update_cluster
  start_heartbeat_if_needed
  self
end

#delete(key) ⇒ String

Remove a key and its value.

The previous value is returned, or ‘nil` if the key did not exist.

Parameters:

  • key (String)

    the key to remove

Returns:

  • (String)

    the previous value, if any



80
81
82
83
84
# File 'lib/etcd/client/protocol.rb', line 80

def delete(key)
  data = request_data(:delete, key_uri(key))
  return nil unless data
  data[S_PREV_NODE][S_VALUE]
end

#exists?(key) ⇒ true, false

Returns true if the specified key exists.

This is a convenience method and equivalent to calling #get and checking if the value is ‘nil`.

Returns:

  • (true, false)

    whether or not the specified key exists



92
93
94
# File 'lib/etcd/client/protocol.rb', line 92

def exists?(key)
  !!get(key)
end

#get(key) ⇒ String, Hash

Gets the value or values for a key.

If the key represents a directory with direct decendants (e.g. “/foo” for “/foo/bar”) a hash of keys and values will be returned.

Parameters:

  • key (String)

    the key or prefix to retrieve

Returns:

  • (String, Hash)

    the value for the key, or a hash of keys and values when the key is a prefix.



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/etcd/client/protocol.rb', line 39

def get(key)
  data = request_data(:get, key_uri(key))
  return nil unless data
  if nodes = data[S_NODE][S_NODES]
    nodes.each_with_object({}) do |node, acc|
      acc[node[S_KEY]] = node[S_VALUE]
    end
  else
    data[S_NODE][S_VALUE]
  end
end

#handle_redirected(uri, response) ⇒ Object

Only happens on attempted write to a follower node in cluster. Means:

  • leader changed since last update

Solution: just get fresh cluster status



71
72
73
74
# File 'lib/etcd/client/failover.rb', line 71

def handle_redirected(uri, response)
  update_cluster
  http_client.default_redirect_uri_callback(uri, response)
end

#info(key) ⇒ Hash

Returns info about a key, such as TTL, expiration and index.

For keys with values the returned hash will include ‘:key`, `:value` and `:index`. Additionally for keys with a TTL set there will be a `:ttl` and `:expiration` (as a UTC `Time`).

For keys that represent directories with no direct decendants (e.g. “/foo” for “/foo/bar/baz”) the ‘:dir` key will have the value `true`.

For keys that represent directories with direct decendants (e.g. “/foo” for “/foo/bar”) a hash of keys and info will be returned.

Parameters:

  • key (String)

    the key or prefix to retrieve

Returns:

  • (Hash)

    a with info about the key, the exact contents depend on what kind of key it is.



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/etcd/client/protocol.rb', line 111

def info(key)
  data = request_data(:get, uri(key))
  return nil unless data
  if nodes = data[S_NODE][S_NODES]
    nodes.each_with_object({}) do |d, acc|
      info = extract_info(d)
      info.delete(:action)
      acc[info[:key]] = info
    end
  else
    info = extract_info(data)
    info.delete(:action)
    info
  end
end

#inspectObject



16
17
18
# File 'lib/etcd/client.rb', line 16

def inspect
  %Q(<Etcd::Client #{seed_uris}>)
end

#key_uri(key) ⇒ Object



160
161
162
# File 'lib/etcd/client/protocol.rb', line 160

def key_uri(key)
  uri(key, S_KEYS)
end

#leader_uriObject



58
59
60
# File 'lib/etcd/client/failover.rb', line 58

def leader_uri
  leader && leader.client_urls && leader.client_urls.first
end

#observe(prefix, options = {}, &handler) ⇒ #cancel, #join

Sets up a continuous watch of a key or prefix.

This method works like #watch (which is used behind the scenes), but will re-watch the key or prefix after receiving a change notificiation.

When re-watching the index of the previous change notification is used, so no subsequent changes will be lost while a change is being processed.

Unlike #watch this method as asynchronous. The watch handler runs in a separate thread (currently a new thread is created for each invocation, keep this in mind if you need to watch many different keys), and can be cancelled by calling ‘#cancel` on the returned object.

Because of implementation details the watch handler thread will not be stopped directly when you call ‘#cancel`. The thread will be blocked until the next change notification (which will be ignored). This will have very little effect on performance since the thread will not be runnable. Unless you’re creating lots of observers it should not matter. If you want to make sure you wait for the thread to stop you can call ‘#join` on the returned object.

Examples:

Creating and cancelling an observer

observer = client.observe('/foo') do |value|
  # do something on changes
end
# ...
observer.cancel

Returns:

  • (#cancel, #join)

    an observer object which you can call cancel and join on



34
35
36
37
38
# File 'lib/etcd/client/observing.rb', line 34

def observe(prefix, options = {}, &handler)
  ob = Observer.new(self, prefix, handler, options).tap(&:run)
  @observers[prefix] = ob
  ob
end

#observers_overviewObject



40
41
42
43
44
# File 'lib/etcd/client/observing.rb', line 40

def observers_overview
  observers.map do |_, observer|
    observer.pp_status
  end
end

#refresh_observersObject

Re-initiates watches after leader election



52
53
54
55
56
57
# File 'lib/etcd/client/observing.rb', line 52

def refresh_observers
  logger.debug("refresh_observers: enter")
  observers.each do |_, observer|
    observer.rerun unless observer.status
  end
end

#refresh_observers_if_neededObject



46
47
48
# File 'lib/etcd/client/observing.rb', line 46

def refresh_observers_if_needed
  refresh_observers if observers.values.any?{|x| not x.status}
end

#set(key, value, options = {}) ⇒ String

Sets the value of a key.

Accepts an optional ‘:ttl` which is the number of seconds that the key should live before being automatically deleted.

Parameters:

  • key (String)

    the key to set

  • value (String)

    the value to set

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :ttl (Fixnum) — default: nil

    an optional time to live (in seconds) for the key

Returns:

  • (String)

    The previous value (if any)



24
25
26
27
28
29
# File 'lib/etcd/client/protocol.rb', line 24

def set(key, value, options={})
  body       = {:value => value}
  body[:ttl] = options[:ttl] if options[:ttl]
  data       = request_data(:PUT, key_uri(key), body: body)
  data[S_PREV_NODE][S_VALUE] if data && data[S_PREV_NODE]
end

#start_heartbeat_if_neededObject



62
63
64
65
66
# File 'lib/etcd/client/failover.rb', line 62

def start_heartbeat_if_needed
  logger.debug("client - starting heartbeat")
  @heartbeat = Etcd::Heartbeat.new(self, @heartbeat_freq)
  @heartbeat.start_heartbeat_if_needed
end

#update(key, value, expected_value, options = {}) ⇒ true, false

Atomically sets the value for a key if the current value for the key matches the specified expected value.

Returns ‘true` when the operation succeeds, i.e. when the specified expected value matches the current value. Returns `false` otherwise.

Accepts an optional ‘:ttl` which is the number of seconds that the key should live before being automatically deleted.

Parameters:

  • key (String)

    the key to set

  • value (String)

    the value to set

  • expected_value (String)

    the value to compare to the current value

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :ttl (Fixnum) — default: nil

    an optional time to live (in seconds) for the key

Returns:

  • (true, false)

    whether or not the operation succeeded



67
68
69
70
71
72
# File 'lib/etcd/client/protocol.rb', line 67

def update(key, value, expected_value, options={})
  body       = {:value => value, :prevValue => expected_value}
  body[:ttl] = options[:ttl] if options[:ttl]
  data       = request_data(:put, key_uri(key), body: body)
  !! data
end

#update_clusterObject

Creates a Cluster-instance from ‘@seed_uris` and stores the cluster leader information



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/etcd/client/failover.rb', line 37

def update_cluster
  logger.debug("update_cluster: enter")
  begin
    @cluster = Etcd::Cluster.init_from_uris(*seed_uris)
    @leader  = @cluster.leader
    @status  = :up
    logger.debug("update_cluster: after success")
    refresh_observers
    @cluster
  rescue AllNodesDownError => e
    logger.debug("update_cluster: failed")
    raise e
  end
end

#watch(prefix, options = {}) {|value, key, info| ... } ⇒ Object

Watches a key or prefix and calls the given block when with any changes.

This method will block until the server replies. There is no way to cancel the call.

The parameters to the block are the value, the key and a hash of additional info. The info will contain the ‘:action` that caused the change (`:set`, `:delete` etc.), the `:key`, the `:value`, the `:index`, `:new_key` with the value `true` when a new key was created below the watched prefix, `:previous_value`, if any, `:ttl` and `:expiration` if applicable.

The reason why the block parameters are in the order`value`, ‘key` instead of `key`, `value` is because you almost always want to get the new value when you watch, but not always the key, and most often not the info. With this order you can leave out the parameters you don’t need.

Parameters:

  • prefix (String)

    the key or prefix to watch

  • options (Hash) (defaults to: {})

Options Hash (options):

  • :index (Fixnum) — default: nil

    the index to start watching from

Yield Parameters:

  • value (String)

    the value of the key that changed

  • key (String)

    the key that changed

  • info (Hash)

    the info for the key that changed

Returns:

  • (Object)

    the result of the given block



151
152
153
154
155
156
157
# File 'lib/etcd/client/protocol.rb', line 151

def watch(prefix, options={})
  options.merge!(wait: 'true')
  options.delete(:index) if options.has_key?(:index) && options[:index].nil?
  data = request_data(:get, watch_uri(prefix), query: options)
  info = extract_info(data)
  yield info[:value], info[:key], info
end

#watch_uri(key) ⇒ Object



164
165
166
# File 'lib/etcd/client/protocol.rb', line 164

def watch_uri(key)
  uri(key, S_KEYS)
end