Class: Libcouchbase::Bucket

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/libcouchbase/bucket.rb

Constant Summary collapse

AddDefaults =
{operation: :add}.freeze
ReplaceDefaults =
{operation: :replace}.freeze
ViewDefaults =
{
    on_error: :stop,
    stale: false
}
FtsDefaults =
{
    include_docs: true,
    size: 10000, # Max result size
    from: 0,
    explain: false
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**options) ⇒ Bucket

Returns a new instance of Bucket.



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/libcouchbase/bucket.rb', line 21

def initialize(**options)
    @connection_options = options
    @connection = Connection.new(**options)
    connect

    # This obtains the connections reactor
    @reactor = reactor
    @quiet = false

    # clean up the connection once this object is garbage collected
    ObjectSpace.define_finalizer( self, self.class.finalize(@connection) )
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



35
36
37
# File 'lib/libcouchbase/bucket.rb', line 35

def connection
  @connection
end

#quietObject

Returns the value of attribute quiet.



36
37
38
# File 'lib/libcouchbase/bucket.rb', line 36

def quiet
  @quiet
end

Class Method Details

.finalize(connection) ⇒ Object



13
14
15
16
17
18
19
# File 'lib/libcouchbase/bucket.rb', line 13

def self.finalize(connection)
    proc {
        connection.destroy.finally do
            connection.reactor.unref
        end
    }
end

Instance Method Details

#[](key) ⇒ Object

Quietly obtain an object stored in Couchbase by given key.



161
162
163
# File 'lib/libcouchbase/bucket.rb', line 161

def [](key)
    get(key, quiet: true)
end

#add(key, value, async: false, **opts) ⇒ Libcouchbase::Result

Add the item to the database, but fail if the object exists already

Examples:

Store the key which will be expired in 2 seconds using relative TTL.

c.add("foo", "bar", expire_in: 2)

Store the key which will be expired in 2 seconds using absolute TTL.

c.add(:foo, :bar, expire_at: Time.now.to_i + 2)

Set application specific flags

c.add("foo", "bar", flags: 0x1000)

Ensure that the key will be persisted at least on the one node

c.add("foo", "bar", persist_to: 1)

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • value (Object)

    Value to be stored

  • options (Hash)

    Options for operation.

Returns:

  • (Libcouchbase::Result)

    this includes the CAS value of the object.

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue



210
211
212
# File 'lib/libcouchbase/bucket.rb', line 210

def add(key, value, async: false, **opts)
    result @connection.store(key, value, **AddDefaults.merge(opts)), async
end

#compare_and_swap(key, **opts) {|value| ... } ⇒ Libcouchbase::Response Also known as: cas

Compare and swap value.

Reads a key’s value from the server and yields it to a block. Replaces the key’s value with the result of the block as long as the key hasn’t been updated in the meantime, otherwise raises Error::KeyExists.

Setting the :retry option to a positive number will cause this method to rescue the Error::KeyExists error that happens when an update collision is detected, and automatically get a fresh copy of the value and retry the block. This will repeat as long as there continues to be conflicts, up to the maximum number of retries specified.

Examples:

Implement append to JSON encoded value


c.set(:foo, {bar: 1})
c.cas(:foo) do |val|
  val[:baz] = 2
  val
end
c.get(:foo)      #=> {bar: 1, baz: 2}

Parameters:

  • key (String, Symbol)
  • options (Hash)

    the options for "swap" part

Yield Parameters:

  • value (Object)

    existing value

Yield Returns:

  • (Object)

    new value.

Returns:

Raises:

  • (Couchbase::Error::KeyExists)

    if the key was updated before the the code in block has been completed (the CAS value has been changed).

  • (ArgumentError)

    if the block is missing



634
635
636
637
638
639
640
641
642
643
644
645
646
647
# File 'lib/libcouchbase/bucket.rb', line 634

def compare_and_swap(key, **opts)
    retries = opts.delete(:retry) || 0
    begin
        current = result(@connection.get(key))
        new_value = yield current.value, opts
        opts[:cas] = current.cas

        set(key, new_value, **opts)
    rescue Libcouchbase::Error::KeyExists
        retries -= 1
        retry if retries >= 0
        raise
    end
end

#decr(key, by = 1, **opts) ⇒ Object

Decrement the value of an existing numeric key

Helper method, see incr



368
369
370
# File 'lib/libcouchbase/bucket.rb', line 368

def decr(key, by = 1, **opts)
    incr(key, -by, **opts)
end

#delete(key, async: false, quiet: true, **opts) ⇒ true, false

Delete the specified key

Examples:

Delete the key in quiet mode (default)

c.set("foo", "bar")
c.delete("foo")        #=> true
c.delete("foo")        #=> false

Delete the key verbosely

c.set("foo", "bar")
c.delete("foo", quiet: false)   #=> true
c.delete("foo", quiet: true)    #=> nil (default behaviour)
c.delete("foo", quiet: false)   #=> will raise Libcouchbase::Error::KeyNotFound

Delete the key with version check

res = c.set("foo", "bar")       #=> #<struct Libcouchbase::Response callback=:callback_set, key="foo", cas=1975457268957184, value="bar", metadata={:flags=>0}>
c.delete("foo", cas: 123456)    #=> will raise Libcouchbase::Error::KeyExists
c.delete("foo", cas: res.cas)   #=> true

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • options (Hash)

    Options for operation.

Returns:

  • (true, false)

    the result of the operation.

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



408
409
410
411
412
413
414
415
416
417
418
419
420
# File 'lib/libcouchbase/bucket.rb', line 408

def delete(key, async: false, quiet: true, **opts)
    promise = @connection.remove(key, **opts).then { true }
    if quiet
        promise = promise.catch { |error|
            if error.is_a? Libcouchbase::Error::KeyNotFound
                false
            else
                ::Libuv::Q.reject(@reactor, error)
            end
        }
    end
    result promise, async
end

#delete_design_doc(id, rev = nil, async: false) ⇒ Object

Delete design doc with given id and optional revision.

Parameters:

  • id (String, Symbol)

    ID of the design doc

  • rev (String) (defaults to: nil)

    Optional revision

See Also:



593
594
595
596
597
# File 'lib/libcouchbase/bucket.rb', line 593

def delete_design_doc(id, rev = nil, async: false)
    id = id.to_s.sub(/^_design\//, '')
    rev = "?rev=#{rev}" if rev
    result @connection.http("/_design/#{id}#{rev}", method: :delete, type: :view), async
end

#design_docs(**opts) ⇒ Libcouchbase::DesignDocs

Fetch design docs stored in current bucket



487
488
489
# File 'lib/libcouchbase/bucket.rb', line 487

def design_docs(**opts)
    DesignDocs.new(self, @connection, proc { |promise, async| result(promise, async) }, **opts)
end

#fetch(key, value = nil, async: false, **opts) ⇒ Object

A helper method for returning a default value if one doesn’t exist for the key



166
167
168
169
170
171
172
# File 'lib/libcouchbase/bucket.rb', line 166

def fetch(key, value = nil, async: false, **opts)
    cached_obj = get(key, quiet: true, async: false, extended: false)
    return cached_obj if cached_obj
    value = value || yield
    set(key, value, opts.merge(async: false, extended: false))
    value
end

#flush(async: false) ⇒ Libcouchbase::Response

Delete contents of the bucket

Examples:

Simple flush the bucket

c.flush

Returns:

Raises:

  • (Libcouchbase::Error::HttpError)

    in case of an error is encountered.

See Also:



433
434
435
# File 'lib/libcouchbase/bucket.rb', line 433

def flush(async: false)
    result @connection.flush, async
end

#full_text_search(index, query, **opts, &row_modifier) ⇒ Libcouchbase::Results

Returns an enumerable for the results in a full text search.

Results are lazily loaded when an operation is performed on the enum



521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
# File 'lib/libcouchbase/bucket.rb', line 521

def full_text_search(index, query, **opts, &row_modifier)
    if query.is_a? Hash
        opts[:query] = query
    else
        opts[:query] = {query: query}
    end
    fts = @connection.full_text_search(index, **FtsDefaults.merge(opts))

    current = ::Libuv::Reactor.current
    if current && current.running?
        ResultsLibuv.new(fts, current, &row_modifier)
    elsif Object.const_defined?(:EventMachine) && EM.reactor_thread?
        ResultsEM.new(fts, &row_modifier)
    else
        ResultsNative.new(fts, &row_modifier)
    end
end

#get(key, *keys, extended: false, async: false, quiet: @quiet, assemble_hash: false, **opts) ⇒ Object, ...

Obtain an object stored in Couchbase by given key.

Examples:

Get single value in quiet mode (the default)

c.get("foo")     #=> the associated value or nil

Use alternative hash-like syntax

c["foo"]         #=> the associated value or nil

Get single value in verbose mode

c.get("missing-foo", quiet: false)  #=> raises Libcouchbase::Error::NotFound

Get multiple keys

c.get("foo", "bar", "baz")   #=> [val1, val2, val3]

Get multiple keys with assembing result into the Hash

c.get("foo", "bar", "baz", assemble_hash: true)
#=> {"foo" => val1, "bar" => val2, "baz" => val3}

Get and lock key using default timeout

c.get("foo", lock: true)  # This locks for the maximum time of 30 seconds

Get and lock key using custom timeout

c.get("foo", lock: 3)

Get and lock multiple keys using custom timeout

c.get("foo", "bar", lock: 3)

Parameters:

  • keys (String, Symbol, Array)

    One or several keys to fetch

  • options (Hash)

    Options for operation.

Returns:

  • (Object, Array, Hash, Libcouchbase::Result)

    the value(s)

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/libcouchbase/bucket.rb', line 86

def get(key, *keys, extended: false, async: false, quiet: @quiet, assemble_hash: false, **opts)
    was_array = key.respond_to?(:to_a) || keys.length > 0
    keys.unshift Array(key) # Convert enumerables
    keys.flatten!           # Ensure we're left with a list of keys

    if keys.length == 1
        promise = @connection.get(keys[0], **opts)

        unless extended
            promise = promise.then(proc { |resp|
                resp.value
            })
        end

        if quiet
            promise = promise.catch { |err|
                if err.is_a? Libcouchbase::Error::KeyNotFound
                    nil
                else
                    ::Libuv::Q.reject(@reactor, err)
                end
            }
        end

        if assemble_hash
            promise = promise.then(proc { |val|
                hash = defined?(::HashWithIndifferentAccess) ? ::HashWithIndifferentAccess.new : {}
                hash[keys[0]] = val
                hash
            })
        elsif was_array
            promise = promise.then(proc { |val|
                Array(val)
            })
        end

        result(promise, async)
    else
        promises = keys.collect { |key|
            @connection.get(key, **opts)
        }

        if quiet
            promises.map! { |prom|
                prom.catch { |err|
                    if err.is_a? Libcouchbase::Error::KeyNotFound
                        nil
                    else
                        ::Libuv::Q.reject(@reactor, err)
                    end
                }
            }
        end

        result(@reactor.all(*promises).then(proc { |results|
            if extended
                results.compact!
            else
                results.collect! { |resp| resp.value if resp }
            end

            if assemble_hash
                hash = defined?(::HashWithIndifferentAccess) ? ::HashWithIndifferentAccess.new : {}
                keys.each_with_index do |key, index|
                    hash[key] = results[index]
                end
                hash
            else
                results
            end
        }), async)
    end
end

#get_num_nodesInteger

The numbers of nodes in the cluster

Returns:

  • (Integer)


658
659
660
# File 'lib/libcouchbase/bucket.rb', line 658

def get_num_nodes
    result @connection.get_num_nodes
end

#get_num_replicasInteger

The numbers of the replicas for each node in the cluster

Returns:

  • (Integer)


652
653
654
# File 'lib/libcouchbase/bucket.rb', line 652

def get_num_replicas
    result @connection.get_num_replicas
end

#incr(key, by = 1, create: false, extended: false, async: false, **opts) ⇒ Integer

Increment the value of an existing numeric key

The increment method allow you to increase or decrease a given stored integer value. Updating the value of a key if it can be parsed to an integer. The update operation occurs on the server and is provided at the protocol level. This simplifies what would otherwise be a two-stage get and set operation.

Examples:

Increment key by one

c.incr(:foo)

Increment key by 50

c.incr("foo", 50)

Increment key by one OR initialize with zero

c.incr("foo", create: true)   #=> will return old+1 or 0

Increment key by one OR initialize with three

c.incr("foo", 50, initial: 3) #=> will return old+50 or 3

Increment key and get its CAS value

resp = c.incr("foo", :extended => true)
resp.cas   #=> 12345
resp.value #=> 2

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • by (Integer) (defaults to: 1)

    Integer (up to 64 bits) value to increment or decrement

  • options (Hash)

    Options for operation.

Returns:

  • (Integer)

    the actual value of the key.

Raises:

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists

  • (Libcouchbase::Error::DeltaBadval)

    if the key contains non-numeric value



355
356
357
358
359
360
361
362
363
# File 'lib/libcouchbase/bucket.rb', line 355

def incr(key, by = 1, create: false, extended: false, async: false, **opts)
    opts[:delta] ||= by
    opts[:initial] = 0 if create
    promise = @connection.counter(key, **opts)
    if not extended
        promise = promise.then { |resp| resp.value }
    end
    result promise, async
end

#n1ql(**options) ⇒ Libcouchbase::N1QL

Returns an n1ql query builder.

Returns:



548
549
550
# File 'lib/libcouchbase/bucket.rb', line 548

def n1ql(**options)
    N1QL.new(self, **options)
end

#replace(key, value, async: false, **opts) ⇒ Libcouchbase::Result

Replace the existing object in the database

Examples:

Store the key which will be expired in 2 seconds using relative TTL.

c.replace("foo", "bar", expire_in: 2)

Store the key which will be expired in 2 seconds using absolute TTL.

c.replace(:foo, :bar, expire_at: Time.now.to_i + 2)

Set application specific flags

c.replace("foo", "bar", flags: 0x1000)

Ensure that the key will be persisted at least on the one node

c.replace("foo", "bar", persist_to: 1)

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • value (Object)

    Value to be stored

  • options (Hash)

    Options for operation.

Returns:

  • (Libcouchbase::Result)

    this includes the CAS value of the object.

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



302
303
304
# File 'lib/libcouchbase/bucket.rb', line 302

def replace(key, value, async: false, **opts)
    result @connection.store(key, value, **ReplaceDefaults.merge(opts)), async
end

#save_design_doc(data, id = nil, async: false) ⇒ Object

Update or create design doc with supplied views

Parameters:

  • data (Hash, IO, String)

    The source object containing JSON encoded design document.

See Also:



558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
# File 'lib/libcouchbase/bucket.rb', line 558

def save_design_doc(data, id = nil, async: false)
    attrs = case data
    when String
        JSON.parse(data, Connection::DECODE_OPTIONS)
    when IO
        JSON.parse(data.read, Connection::DECODE_OPTIONS)
    when Hash
        data
    else
        raise ArgumentError, "Document should be Hash, String or IO instance"
    end
    attrs[:language] ||= :javascript

    id ||= attrs.delete(:_id)
    id = id.to_s.sub(/^_design\//, '')

    prom = @connection.http("/_design/#{id}",
        method: :put,
        body: attrs,
        type: :view
    ).then { |res|
        # Seems to require a moment before the view is usable
        @reactor.sleep 100
        res
    }

    result prom, async
end

#set(key, value, async: false, **opts) ⇒ Libcouchbase::Result Also known as: []=

Unconditionally store the object in the Couchbase

Examples:

Store the key which will be expired in 2 seconds using relative TTL.

c.set("foo", "bar", expire_in: 2)

Store the key which will be expired in 2 seconds using absolute TTL.

c.set(:foo, :bar, expire_at: Time.now.to_i + 2)

Use hash-like syntax to store the value

c[:foo] = {bar: :baz}

Set application specific flags

c.set("foo", "bar", flags: 0x1000)

Perform optimistic locking by specifying last known CAS version

c.set("foo", "bar", cas: 8835713818674332672)

Ensure that the key will be persisted at least on the one node

c.set("foo", "bar", persist_to: 1)

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • value (Object)

    Value to be stored

  • options (Hash)

    Options for operation.

Returns:

  • (Libcouchbase::Result)

    this includes the CAS value of the object.

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue



258
259
260
261
# File 'lib/libcouchbase/bucket.rb', line 258

def set(key, value, async: false, **opts)
    # default operation is set
    result @connection.store(key, value, **opts), async
end

#subdoc(key, quiet: @quiet, **opts) {|the| ... } ⇒ Object

Perform subdocument operations on a key.

Yields a request builder to a block and applies the operations performed

Examples:

Perform a subdocument operation using a block

c.subdoc(:foo) { |subdoc|
  subdoc.get('sub.key')
  subdoc.exists?('other.key')
  subdoc.get_count('some.array')
} # => ["sub key val", true, 23]

perform a subdocument operation using execute!

c.subdoc(:foo).get(:bob).execute! # => { age: 13, working: false }

perform multiple subdocument operations using execute!

c.subdoc(:foo)
 .get(:bob).get(:jane).execute! # => [{ age: 13, working: false }, { age: 47, working: true }]

perform a subdocument mutation operation

c.subdoc(:foo).counter('bob.age', 1).execute! # => 14

Parameters:

  • key (String, Symbol)

Yield Parameters:



466
467
468
469
470
471
472
473
474
# File 'lib/libcouchbase/bucket.rb', line 466

def subdoc(key, quiet: @quiet, **opts)
    if block_given?
        sd = SubdocRequest.new(key, quiet)
        yield sd
        subdoc_execute!(sd, opts)
    else
        SubdocRequest.new(key, quiet, bucket: self, exec_opts: opts)
    end
end

#subdoc_execute!(sd, extended: false, async: false, **opts) ⇒ Object



476
477
478
479
480
481
482
# File 'lib/libcouchbase/bucket.rb', line 476

def subdoc_execute!(sd, extended: false, async: false, **opts)
    promise = @connection.subdoc(sd, opts).then { |resp|
        raise resp.value if resp.value.is_a?(::Exception)
        extended ? resp : resp.value
    }
    result promise, async
end

#touch(key, async: false, **opts) ⇒ Object

Touch a key, changing its CAS and optionally setting a timeout



438
439
440
# File 'lib/libcouchbase/bucket.rb', line 438

def touch(key, async: false, **opts)
    result @connection.touch(key, **opts), async
end

#view(design, view, include_docs: true, is_spatial: false, **opts, &row_modifier) ⇒ Libcouchbase::Results

Returns an enumerable for the results in a view.

Results are lazily loaded when an operation is performed on the enum



496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
# File 'lib/libcouchbase/bucket.rb', line 496

def view(design, view, include_docs: true, is_spatial: false, **opts, &row_modifier)
    view = @connection.query_view(design, view, **ViewDefaults.merge(opts))
    view.include_docs = include_docs
    view.is_spatial = is_spatial

    current = ::Libuv::Reactor.current

    if current && current.running?
        ResultsLibuv.new(view, current, &row_modifier)
    elsif Object.const_defined?(:EventMachine) && EM.reactor_thread?
        ResultsEM.new(view, &row_modifier)
    else
        ResultsNative.new(view, &row_modifier)
    end
end

#wait_results(*results) ⇒ Array

Waits for all the async operations to complete and returns the results

Returns:

  • (Array)


665
666
667
# File 'lib/libcouchbase/bucket.rb', line 665

def wait_results(*results)
    result ::Libuv::Q.all(@reactor, *results.flatten)
end