Class: Riak::Client

Inherits:
Object show all
Includes:
Util::Escape, Util::Translation
Defined in:
lib/riak/client.rb,
lib/riak/client/node.rb,
lib/riak/client/search.rb,
lib/riak/client/decaying.rb,
lib/riak/client/http_backend.rb,
lib/riak/client/excon_backend.rb,
lib/riak/client/net_http_backend.rb,
lib/riak/client/beefcake/messages.rb,
lib/riak/client/feature_detection.rb,
lib/riak/client/protobuffs_backend.rb,
lib/riak/client/beefcake/object_methods.rb,
lib/riak/client/http_backend/key_streamer.rb,
lib/riak/client/http_backend/configuration.rb,
lib/riak/client/beefcake_protobuffs_backend.rb,
lib/riak/client/http_backend/object_methods.rb,
lib/riak/client/http_backend/request_headers.rb,
lib/riak/client/http_backend/transport_methods.rb

Overview

A client connection to Riak.

Defined Under Namespace

Modules: FeatureDetection Classes: BeefcakeProtobuffsBackend, Decaying, ExconBackend, HTTPBackend, LuwakFile, NetHTTPBackend, Node, ProtobuffsBackend

Constant Summary collapse

MAX_CLIENT_ID =

When using integer client IDs, the exclusive upper-bound of valid values.

4294967296
PROTOCOLS =

Array of valid protocols

%w[http https pbc]
HOST_REGEX =

Regexp for validating hostnames, lifted from uri.rb in Ruby 1.8.6

/^(?:(?:(?:[a-zA-Z\d](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.)*(?:[a-zA-Z](?:[-a-zA-Z\d]*[a-zA-Z\d])?)\.?|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|\[(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})|(?:(?:[a-fA-F\d]{1,4}:)*[a-fA-F\d]{1,4})?::(?:(?:[a-fA-F\d]{1,4}:)*(?:[a-fA-F\d]{1,4}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}))?)\])$/n
VALID_OPTIONS =

Valid constructor options.

[:protocol, :nodes, :client_id, :http_backend, :protobuffs_backend] | Node::VALID_OPTIONS
NETWORK_ERRORS =

Network errors.

[
  EOFError,
  Errno::ECONNABORTED,
  Errno::ECONNREFUSED,
  Errno::ECONNRESET,
  Errno::ENETDOWN,
  Errno::ENETRESET,
  Errno::ENETUNREACH,
  SocketError,
  SystemCallError,
]
Pool =
::Innertube::Pool

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util::Escape

#escape, #maybe_escape, #maybe_unescape, #unescape

Methods included from Util::Translation

#i18n_scope, #t

Constructor Details

#initialize(options = {}) ⇒ Client

Creates a client connection to Riak

Parameters:

  • options (Hash) (defaults to: {})

    configuration options for the client

Options Hash (options):

  • :nodes (Array)

    A list of nodes this client connects to. Each element of the list is a hash which is passed to Node.new, e.g. ‘127.0.0.1’, pb_port: 1234, …. If no nodes are given, a single node is constructed from the remaining options given to Client.new.

  • :host (String) — default: '127.0.0.1'

    The host or IP address for the Riak endpoint

  • :protocol (String) — default: 'http'

    The protocol to use for connecting to a node backend

  • :http_port (Fixnum) — default: 8098

    The port of the Riak HTTP endpoint

  • :pb_port (Fixnum) — default: 8087

    The port of the Riak Protocol Buffers endpoint

  • :prefix (String) — default: '/riak/'

    The URL path prefix to the main HTTP endpoint

  • :mapred (String) — default: '/mapred'

    The path to the map-reduce HTTP endpoint

  • :client_id (Fixnum, String) — default: rand(MAX_CLIENT_ID)

    The internal client ID used by Riak to route responses

  • :http_backend (String, Symbol) — default: :NetHTTP

    which HTTP backend to use

  • :protobuffs_backend (String, Symbol) — default: :Beefcake

    which Protocol Buffers backend to use

  • :ssl (Boolean, Hash) — default: nil

    The SSL options to pass to each node or true for default options

Raises:

  • (ArgumentError)

    raised if any invalid options are given



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/riak/client.rb', line 91

def initialize(options={})
  if options.include? :port
    warn(t('deprecated.port', :backtrace => caller[0..2].join("\n    ")))
  end

  unless (evil = options.keys - VALID_OPTIONS).empty?
    raise ArgumentError, "#{evil.inspect} are not valid options for Client.new"
  end

  @nodes = (options[:nodes] || []).map do |n|
    Client::Node.new self, n
  end
  if @nodes.empty? or options[:host] or options[:http_port] or options[:pb_port]
    @nodes |= [Client::Node.new(self, options)]
  end

  @protobuffs_pool = Pool.new(
                              method(:new_protobuffs_backend),
                              lambda { |b| b.teardown }
                              )

  @http_pool = Pool.new(
                        method(:new_http_backend),
                        lambda { |b| b.teardown }
                        )

  self.protocol           = options[:protocol]           || "http"
  self.http_backend       = options[:http_backend]       || :NetHTTP
  self.protobuffs_backend = options[:protobuffs_backend] || :Beefcake
  self.client_id          = options[:client_id]          if options[:client_id]
  self.ssl                = options[:ssl]                if options[:ssl]
end

Instance Attribute Details

#client_idString

Returns The internal client ID used by Riak to route responses.

Returns:

  • (String)

    The internal client ID used by Riak to route responses



59
60
61
# File 'lib/riak/client.rb', line 59

def client_id
  @client_id
end

#http_backendSymbol

Returns The HTTP backend/client to use.

Returns:

  • (Symbol)

    The HTTP backend/client to use



62
63
64
# File 'lib/riak/client.rb', line 62

def http_backend
  @http_backend
end

#http_poolClient::Pool (readonly)

Returns A pool of HTTP connections.

Returns:



65
66
67
# File 'lib/riak/client.rb', line 65

def http_pool
  @http_pool
end

#nodesArray

Returns The set of Nodes this client can communicate with.

Returns:

  • (Array)

    The set of Nodes this client can communicate with.



56
57
58
# File 'lib/riak/client.rb', line 56

def nodes
  @nodes
end

#protobuffs_backendSymbol

Returns The Protocol Buffers backend/client to use.

Returns:

  • (Symbol)

    The Protocol Buffers backend/client to use



68
69
70
# File 'lib/riak/client.rb', line 68

def protobuffs_backend
  @protobuffs_backend
end

#protobuffs_poolClient::Pool (readonly)

Returns A pool of protobuffs connections.

Returns:



71
72
73
# File 'lib/riak/client.rb', line 71

def protobuffs_pool
  @protobuffs_pool
end

#protocolString

Returns The protocol to use for the Riak endpoint.

Returns:

  • (String)

    The protocol to use for the Riak endpoint



53
54
55
# File 'lib/riak/client.rb', line 53

def protocol
  @protocol
end

Instance Method Details

#backend {|HTTPBackend, ProtobuffsBackend| ... } ⇒ Object

Yields a backend for operations that are protocol-independent. You can change which type of backend is used by setting the #protocol.

Yields:



128
129
130
131
132
133
134
135
# File 'lib/riak/client.rb', line 128

def backend(&block)
  case @protocol.to_s
  when /https?/i
    http &block
  when /pbc/i
    protobuffs &block
  end
end

#basic_auth=(auth) ⇒ Object

Sets basic HTTP auth on all nodes.



138
139
140
141
142
143
# File 'lib/riak/client.rb', line 138

def basic_auth=(auth)
  @nodes.each do |node|
    node.basic_auth = auth
  end
  auth
end

#bucket(name, options = {}) ⇒ Bucket Also known as: []

Retrieves a bucket from Riak.

Parameters:

  • bucket (String)

    the bucket to retrieve

  • options (Hash) (defaults to: {})

    options for retrieving the bucket

Options Hash (options):

  • :props (Boolean) — default: false

    whether to retreive the bucket properties

Returns:

  • (Bucket)

    the requested bucket



150
151
152
153
154
155
156
157
158
# File 'lib/riak/client.rb', line 150

def bucket(name, options={})
  unless (options.keys - [:props]).empty?
    raise ArgumentError, "invalid options"
  end
  @bucket_cache ||= {}
  (@bucket_cache[name] ||= Bucket.new(self, name)).tap do |b|
    b.props if options[:props]
  end
end

#bucketsArray<Bucket> Also known as: list_buckets

Note:

This is an expensive operation and should be used only in development.

Lists buckets which have keys stored in them.

Returns:



165
166
167
168
169
170
# File 'lib/riak/client.rb', line 165

def buckets
  warn(t('list_buckets', :backtrace => caller.join("\n    "))) unless Riak.disable_list_keys_warnings
  backend do |b|
    b.list_buckets.map {|name| Bucket.new(self, name) }
  end
end

#choose_node(nodes = self.nodes) ⇒ Object

Choose a node from a set.



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/riak/client.rb', line 174

def choose_node(nodes = self.nodes)
  # Prefer nodes which have gone a reasonable time without errors.
  s = nodes.select do |node|
    node.error_rate.value < 0.1
  end

  if s.empty?
    # Fall back to minimally broken node.
    nodes.min_by do |node|
      node.error_rate.value
    end
  else
    s[rand(s.size)]
  end
end

#delete_file(filename) ⇒ Object

Deletes a file stored via the “Luwak” interface

Parameters:

  • filename (String)

    the key/filename to delete



222
223
224
225
226
227
# File 'lib/riak/client.rb', line 222

def delete_file(filename)
  http do |h|
    h.delete_file(filename)
  end
  true
end

#delete_object(bucket, key, options = {}) ⇒ Object

Delete an object. See Bucket#delete



230
231
232
233
234
# File 'lib/riak/client.rb', line 230

def delete_object(bucket, key, options = {})
  backend do |b|
    b.delete_object(bucket, key, options)
  end
end

#file_exists?(key) ⇒ true, false Also known as: file_exist?

Checks whether a file exists in “Luwak”.

Parameters:

  • key (String)

    the key to check

Returns:

  • (true, false)

    whether the key exists in “Luwak”



239
240
241
242
243
# File 'lib/riak/client.rb', line 239

def file_exists?(key)
  http do |h|
    h.file_exists?(key)
  end
end

#get_bucket_props(bucket) ⇒ Object

Bucket properties. See Bucket#props



247
248
249
250
251
# File 'lib/riak/client.rb', line 247

def get_bucket_props(bucket)
  backend do |b|
    b.get_bucket_props bucket
  end
end

#get_file(filename) {|chunk| ... } ⇒ IO?

Retrieves a large file/IO object from Riak via the “Luwak” interface. Streams the data to a temporary file unless a block is given.

Parameters:

  • filename (String)

    the key/filename for the object

Yields:

  • (chunk)

    stream contents of the file through the block. Passing the block will result in nil being returned from the method.

Yield Parameters:

  • chunk (String)

    a single chunk of the object’s data

Returns:

  • (IO, nil)

    the file (also having content_type and original_filename accessors). The file will need to be reopened to be read. nil will be returned if a block is given.



264
265
266
267
268
# File 'lib/riak/client.rb', line 264

def get_file(filename, &block)
  http do |h|
    h.get_file(filename, &block)
  end
end

#get_index(bucket, index, query) ⇒ Object

Queries a secondary index on a bucket. See Bucket#get_index



271
272
273
274
275
# File 'lib/riak/client.rb', line 271

def get_index(bucket, index, query)
  backend do |b|
    b.get_index bucket, index, query
  end
end

#get_object(bucket, key, options = {}) ⇒ Object

Get an object. See Bucket#get



278
279
280
281
282
# File 'lib/riak/client.rb', line 278

def get_object(bucket, key, options = {})
  backend do |b|
    b.fetch_object(bucket, key, options)
  end
end

#http(&block) ⇒ Object

Yields an HTTPBackend.



285
286
287
# File 'lib/riak/client.rb', line 285

def http(&block)
  recover_from @http_pool, &block
end

#index(index, *docs) ⇒ Object #index(*docs) ⇒ Object Also known as: add_doc

(Riak Search) Adds documents to a search index via the Solr interface.

Overloads:

  • #index(index, *docs) ⇒ Object

    Adds documents to the specified search index

    Parameters:

    • index (String)

      the index in which to add/update the given documents

    • docs (Array<Hash>)

      unnested document hashes, with one key per field

  • #index(*docs) ⇒ Object

    Adds documents to the default search index

    Parameters:

    • docs (Array<Hash>)

      unnested document hashes, with one key per field

Raises:

  • (ArgumentError)

    if any documents don’t include ‘id’ key



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/riak/client/search.rb', line 38

def index(*args)
  index = args.shift if String === args.first # Documents must be hashes of fields
  raise ArgumentError.new(t("search_docs_require_id")) unless args.all? {|d| d.key?("id") || d.key?(:id) }
  xml = Builder::XmlMarkup.new
  xml.add do
    args.each do |doc|
      xml.doc do
        doc.each do |k,v|
          xml.field('name' => k.to_s) { xml.text!(v.to_s) }
        end
      end
    end
  end
  http do |h|
    h.update_search_index(index, xml.target!)
  end
  true
end

#inspectString

Returns A representation suitable for IRB and debugging output.

Returns:

  • (String)

    A representation suitable for IRB and debugging output.



298
299
300
# File 'lib/riak/client.rb', line 298

def inspect
  "#<Riak::Client #{nodes.inspect}>"
end

Link-walk.



303
304
305
306
307
# File 'lib/riak/client.rb', line 303

def link_walk(object, specs)
  http do |h|
    h.link_walk object, specs
  end
end

#list_keys(bucket, &block) ⇒ Object

Retrieves a list of keys in the given bucket. See Bucket#keys



310
311
312
313
314
315
316
317
318
319
320
# File 'lib/riak/client.rb', line 310

def list_keys(bucket, &block)
  if block_given?
    backend do |b|
      b.list_keys bucket, &block
    end
  else
    backend do |b|
      b.list_keys bucket
    end
  end
end

#mapred(mr, &block) ⇒ Object

Executes a mapreduce request. See MapReduce#run



323
324
325
326
327
# File 'lib/riak/client.rb', line 323

def mapred(mr, &block)
  backend do |b|
    b.mapred(mr, &block)
  end
end

#new_http_backendHTTPBackend

Creates a new HTTP backend.

Returns:



331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/riak/client.rb', line 331

def new_http_backend
  klass = self.class.const_get("#{@http_backend}Backend")
  if klass.configured?
    node = choose_node(
      @nodes.select do |n|
        n.http?
      end
    )

    klass.new(self, node)
  else
    raise t('http_configuration', :backend => @http_backend)
  end
end

#new_protobuffs_backendProtobuffsBackend

Creates a new protocol buffers backend.

Returns:



349
350
351
352
353
354
355
356
357
358
359
360
361
362
# File 'lib/riak/client.rb', line 349

def new_protobuffs_backend
  klass = self.class.const_get("#{@protobuffs_backend}ProtobuffsBackend")
  if klass.configured?
    node = choose_node(
      @nodes.select do |n|
        n.protobuffs?
      end
    )

    klass.new(self, node)
  else
    raise t('protobuffs_configuration', :backend => @protobuffs_backend)
  end
end

#nodeNode

Returns An arbitrary Node.

Returns:

  • (Node)

    An arbitrary Node.



365
366
367
# File 'lib/riak/client.rb', line 365

def node
  nodes[rand nodes.size]
end

#pingtrue, false

Pings the Riak cluster to check for liveness.

Returns:

  • (true, false)

    whether the Riak cluster is alive and reachable



371
372
373
374
375
# File 'lib/riak/client.rb', line 371

def ping
  backend do |b|
    b.ping
  end
end

#protobuffs(&block) ⇒ Object

Yields a protocol buffers backend.



378
379
380
# File 'lib/riak/client.rb', line 378

def protobuffs(&block)
  recover_from @protobuffs_pool, &block
end

#recover_from(pool) ⇒ Object

Takes a pool. Acquires a backend from the pool and yields it with node-specific error recovery.



419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# File 'lib/riak/client.rb', line 419

def recover_from(pool)
  skip_nodes = []
  take_opts = {}
  tries = 3

  begin
    # Only select nodes which we haven't used before.
    unless skip_nodes.empty?
      take_opts[:filter] = lambda do |backend|
        not skip_nodes.include? backend.node
      end
    end

    # Acquire a backend
    pool.take(take_opts) do |backend|
      begin
        yield backend
      rescue *NETWORK_ERRORS => e
        # Network error.
        tries -= 1

        # Notify the node that a request against it failed.
        backend.node.error_rate << 1

        # Skip this node next time.
        skip_nodes << backend.node

        # And delete this connection.
        raise Pool::BadResource, e
      end
    end
  rescue Pool::BadResource => e
    retry if tries > 0
    raise e.message
  end
end

#reload_object(object, options = {}) ⇒ Object

Reloads the object from Riak.



457
458
459
460
461
# File 'lib/riak/client.rb', line 457

def reload_object(object, options = {})
  backend do |b|
    b.reload_object(object, options)
  end
end

#remove(index, specs) ⇒ Object #remove(specs) ⇒ Object Also known as: delete_doc, deindex

(Riak Search) Removes documents from a search index via the Solr interface.

Overloads:

  • #remove(index, specs) ⇒ Object

    Removes documents from the specified index

    Parameters:

    • index (String)

      the index from which to remove documents

    • specs (Array<Hash>)

      the specificaiton of documents to remove (must contain ‘id’ or ‘query’ keys)

  • #remove(specs) ⇒ Object

    Removes documents from the default index

    Parameters:

    • specs (Array<Hash>)

      the specification of documents to remove (must contain ‘id’ or ‘query’ keys)

Raises:

  • (ArgumentError)

    if any document specs don’t include ‘id’ or ‘query’ keys



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/riak/client/search.rb', line 67

def remove(*args)
  index = args.shift if String === args.first
  raise ArgumentError.new(t("search_remove_requires_id_or_query")) unless args.all? { |s|
    s.include? :id or
    s.include? 'id' or
    s.include? :query or
    s.include? 'query'
  }
  xml = Builder::XmlMarkup.new
  xml.delete do
    args.each do |spec|
      spec.each do |k,v|
        xml.tag!(k.to_sym, v)
      end
    end
  end
  http do |h|
    h.update_search_index(index, xml.target!)
  end
  true
end

#search(index, query, options = {}) ⇒ Hash #search(query, options = {}) ⇒ Hash Also known as: select

(Riak Search) Performs a search via the Solr interface.

Overloads:

  • #search(index, query, options = {}) ⇒ Hash

    Parameters:

    • index (String)

      the index to query on

    • query (String)

      a Lucene query string

  • #search(query, options = {}) ⇒ Hash

    Queries the default index

    Parameters:

    • query (String)

      a Lucene query string

Parameters:

  • options (Hash)

    extra options for the Solr query

Returns:

  • (Hash)

    the query result, containing the ‘responseHeaders’ and ‘response’ keys



20
21
22
23
24
25
26
# File 'lib/riak/client/search.rb', line 20

def search(*args)
  options = args.extract_options!
  index, query = args[-2], args[-1]  # Allows nil index, while keeping it as firstargument
  backend do |b|
    b.search(index, query, options)
  end
end

#set_bucket_props(bucket, properties) ⇒ Object

Sets the properties on a bucket. See Bucket#props=



464
465
466
467
468
469
470
# File 'lib/riak/client.rb', line 464

def set_bucket_props(bucket, properties)
  # A bug in Beefcake is still giving us trouble with default booleans.
  # Until it is resolved, we'll use the HTTP backend.
  http do |b|
    b.set_bucket_props(bucket, properties)
  end
end

#ssl=(value) ⇒ Object

Enables or disables SSL on all nodes, for HTTP backends.



473
474
475
476
477
478
# File 'lib/riak/client.rb', line 473

def ssl=(value)
  @nodes.each do |node|
    node.ssl = value
  end
  value
end

#stampStamp

Exposes a Stamp object for use in generating unique identifiers.

Returns:

  • (Stamp)

    an ID generator

See Also:



484
485
486
# File 'lib/riak/client.rb', line 484

def stamp
  @stamp ||= Riak::Stamp.new(self)
end

#store_file(filename, content_type, data) ⇒ String #store_file(content_type, data) ⇒ String

Stores a large file/IO-like object in Riak via the “Luwak” interface.

Overloads:

  • #store_file(filename, content_type, data) ⇒ String

    Stores the file at the given key/filename

    Parameters:

    • filename (String)

      the key/filename for the object

    • content_type (String)

      the MIME Content-Type for the data

    • data (IO, String)

      the contents of the file

  • #store_file(content_type, data) ⇒ String

    Stores the file with a server-determined key/filename

    Parameters:

    • content_type (String)

      the MIME Content-Type for the data

    • data (String, #read)

      the contents of the file

Returns:

  • (String)

    the key/filename where the object was stored



499
500
501
502
503
# File 'lib/riak/client.rb', line 499

def store_file(*args)
  http do |h|
    h.store_file(*args)
  end
end

#store_object(object, options = {}) ⇒ Object

Stores an object in Riak.



506
507
508
509
510
511
# File 'lib/riak/client.rb', line 506

def store_object(object, options = {})
  params = {:returnbody => true}.merge(options)
  backend do |b|
    b.store_object(object, params)
  end
end