Class: Libcouchbase::Bucket

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/libcouchbase/bucket.rb

Constant Summary collapse

AddDefaults =
{operation: :add}.freeze
ReplaceDefaults =
{operation: :replace}.freeze
AppendDefaults =
{operation: :append}.freeze
PrependDefaults =
{operation: :prepend}.freeze
ViewDefaults =
{
    on_error: :stop,
    stale: false
}
FtsDefaults =
{
    include_docs: true,
    size: 10000, # Max result size
    from: 0,
    explain: false
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**options) ⇒ Bucket

Returns a new instance of Bucket.



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/libcouchbase/bucket.rb', line 21

def initialize(**options)
    @connection_options = options
    @connection = Connection.new(**options)
    connect

    # This obtains the connections reactor
    @reactor = reactor
    @quiet = false

    # clean up the connection once this object is garbage collected
    ObjectSpace.define_finalizer( self, self.class.finalize(@connection) )
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



35
36
37
# File 'lib/libcouchbase/bucket.rb', line 35

def connection
  @connection
end

#quietObject

Returns the value of attribute quiet.



36
37
38
# File 'lib/libcouchbase/bucket.rb', line 36

def quiet
  @quiet
end

Class Method Details

.finalize(connection) ⇒ Object



13
14
15
16
17
18
19
# File 'lib/libcouchbase/bucket.rb', line 13

def self.finalize(connection)
    proc {
        connection.destroy.finally do
            connection.reactor.unref
        end
    }
end

Instance Method Details

#[](key) ⇒ Object

Quietly obtain an object stored in Couchbase by given key.



163
164
165
# File 'lib/libcouchbase/bucket.rb', line 163

def [](key)
    get(key, quiet: true)
end

#add(key, value, async: false, **opts) ⇒ Libcouchbase::Result

Add the item to the database, but fail if the object exists already

Examples:

Store the key which will be expired in 2 seconds using relative TTL.

c.add("foo", "bar", expire_in: 2)

Store the key which will be expired in 2 seconds using absolute TTL.

c.add(:foo, :bar, expire_at: Time.now.to_i + 2)

Force JSON document format for value

c.add("foo", {"bar" => "baz"}, format: :document)

Set application specific flags (note that it will be OR-ed with format flags)

c.add("foo", "bar", flags: 0x1000)

Ensure that the key will be persisted at least on the one node

c.add("foo", "bar", persist_to: 1)

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • value (Object)

    Value to be stored

  • options (Hash)

    Options for operation.

Returns:

  • (Libcouchbase::Result)

    this includes the CAS value of the object.

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue



207
208
209
# File 'lib/libcouchbase/bucket.rb', line 207

def add(key, value, async: false, **opts)
    result @connection.store(key, value, **AddDefaults.merge(opts)), async
end

#append(key, value, async: false, **opts) ⇒ Libcouchbase::Result

Note:

This operation is kind of data-aware from server point of view. This mean that the server treats value as binary stream and just perform concatenation, therefore it won't work with :marshal and :document formats, because of lack of knowledge how to merge values in these formats.

Append this object to the existing object

Examples:

Simple append

c.set(:foo, "aaa", format: :plain)
c.append(:foo, "bbb")
c.get("foo")           #=> "aaabbb"

Using optimistic locking. The operation will fail on CAS mismatch

resp = c.set("foo", "aaa", format: :plain)
c.append("foo", "bbb", cas: resp.cas)

Ensure that the key will be persisted at least on the one node

c.append("foo", "bar", persist_to: 1)

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • value (Object)

    Value to be appended

  • options (Hash)

    Options for operation.

Returns:

  • (Libcouchbase::Result)

    this includes the CAS value of the object.

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



352
353
354
# File 'lib/libcouchbase/bucket.rb', line 352

def append(key, value, async: false, **opts)
    result @connection.store(key, value, **AppendDefaults.merge(opts)), async
end

#compare_and_swap(key, **opts) {|value| ... } ⇒ Libcouchbase::Response Also known as: cas

Compare and swap value.

Reads a key’s value from the server and yields it to a block. Replaces the key’s value with the result of the block as long as the key hasn’t been updated in the meantime, otherwise raises Error::KeyExists.

Setting the :retry option to a positive number will cause this method to rescue the Error::KeyExists error that happens when an update collision is detected, and automatically get a fresh copy of the value and retry the block. This will repeat as long as there continues to be conflicts, up to the maximum number of retries specified.

Examples:

Implement append to JSON encoded value


c.set(:foo, {bar: 1})
c.cas(:foo) do |val|
  val[:baz] = 2
  val
end
c.get(:foo)      #=> {bar: 1, baz: 2}

Parameters:

  • key (String, Symbol)
  • options (Hash)

    the options for "swap" part

Yield Parameters:

  • value (Object)

    existing value

Yield Returns:

  • (Object)

    new value.

Returns:

Raises:

  • (Couchbase::Error::KeyExists)

    if the key was updated before the the code in block has been completed (the CAS value has been changed).

  • (ArgumentError)

    if the block is missing



681
682
683
684
685
686
687
688
689
690
691
692
693
694
# File 'lib/libcouchbase/bucket.rb', line 681

def compare_and_swap(key, **opts)
    retries = opts.delete(:retry) || 0
    begin
        current = result(@connection.get(key))
        new_value = yield current.value, opts
        opts[:cas] = current.cas

        set(key, new_value, **opts)
    rescue Libcouchbase::Error::KeyExists
        retries -= 1
        retry if retries >= 0
        raise
    end
end

#decr(key, by = 1, **opts) ⇒ Object

Decrement the value of an existing numeric key

Helper method, see incr



463
464
465
# File 'lib/libcouchbase/bucket.rb', line 463

def decr(key, by = 1, **opts)
    incr(key, -by, **opts)
end

#delete(key, async: false, quiet: true, **opts) ⇒ true, false

Delete the specified key

Examples:

Delete the key in quiet mode (default)

c.set("foo", "bar")
c.delete("foo")        #=> true
c.delete("foo")        #=> false

Delete the key verbosely

c.set("foo", "bar")
c.delete("foo", quiet: false)   #=> true
c.delete("foo", quiet: true)    #=> nil (default behaviour)
c.delete("foo", quiet: false)   #=> will raise Libcouchbase::Error::KeyNotFound

Delete the key with version check

res = c.set("foo", "bar")       #=> #<struct Libcouchbase::Response callback=:callback_set, key="foo", cas=1975457268957184, value="bar", metadata={:format=>:document, :flags=>0}>
c.delete("foo", cas: 123456)    #=> will raise Libcouchbase::Error::KeyExists
c.delete("foo", cas: res.cas)   #=> true

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • options (Hash)

    Options for operation.

Returns:

  • (true, false)

    the result of the operation.

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



503
504
505
506
507
508
509
510
511
512
513
514
515
# File 'lib/libcouchbase/bucket.rb', line 503

def delete(key, async: false, quiet: true, **opts)
    promise = @connection.remove(key, **opts).then { true }
    if quiet
        promise = promise.catch { |error|
            if error.is_a? Libcouchbase::Error::KeyNotFound
                false
            else
                ::Libuv::Q.reject(@reactor, error)
            end
        }
    end
    result promise, async
end

#delete_design_doc(id, rev = nil, async: false) ⇒ Object

Delete design doc with given id and optional revision.

Parameters:

  • id (String, Symbol)

    ID of the design doc

  • rev (String) (defaults to: nil)

    Optional revision

See Also:



640
641
642
643
644
# File 'lib/libcouchbase/bucket.rb', line 640

def delete_design_doc(id, rev = nil, async: false)
    id = id.to_s.sub(/^_design\//, '')
    rev = "?rev=#{rev}" if rev
    result @connection.http("/_design/#{id}#{rev}", method: :delete, type: :view), async
end

#design_docs(**opts) ⇒ Libcouchbase::DesignDocs

Fetch design docs stored in current bucket



540
541
542
# File 'lib/libcouchbase/bucket.rb', line 540

def design_docs(**opts)
    DesignDocs.new(self, @connection, method(:result), **opts)
end

#flush(async: false) ⇒ Libcouchbase::Response

Delete contents of the bucket

Examples:

Simple flush the bucket

c.flush

Returns:

Raises:

  • (Libcouchbase::Error::HttpError)

    in case of an error is encountered.

See Also:



528
529
530
# File 'lib/libcouchbase/bucket.rb', line 528

def flush(async: false)
    result @connection.flush, async
end

#full_text_search(index, query, **opts, &row_modifier) ⇒ Libcouchbase::Results

Returns an enumerable for the results in a full text search.

Results are lazily loaded when an operation is performed on the enum



574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
# File 'lib/libcouchbase/bucket.rb', line 574

def full_text_search(index, query, **opts, &row_modifier)
    if query.is_a? Hash
        opts[:query] = query
    else
        opts[:query] = {query: query}
    end
    fts = @connection.full_text_search(index, **FtsDefaults.merge(opts))

    current = ::Libuv::Reactor.current
    if current && current.running?
        ResultsLibuv.new(fts, current, &row_modifier)
    elsif Object.const_defined?(:EventMachine) && EM.reactor_thread?
        ResultsEM.new(fts, &row_modifier)
    else
        ResultsNative.new(fts, &row_modifier)
    end
end

#get(key, *keys, extended: false, async: false, quiet: @quiet, assemble_hash: false, **opts) ⇒ Object, ...

Obtain an object stored in Couchbase by given key.

Examples:

Get single value in quiet mode (the default)

c.get("foo")     #=> the associated value or nil

Use alternative hash-like syntax

c["foo"]         #=> the associated value or nil

Get single value in verbose mode

c.get("missing-foo", quiet: false)  #=> raises Libcouchbase::Error::NotFound

Get multiple keys

c.get("foo", "bar", "baz")   #=> [val1, val2, val3]

Get multiple keys with assembing result into the Hash

c.get("foo", "bar", "baz", assemble_hash: true)
#=> {"foo" => val1, "bar" => val2, "baz" => val3}

Get and lock key using default timeout

c.get("foo", lock: true)  # This locks for the maximum time of 30 seconds

Get and lock key using custom timeout

c.get("foo", lock: 3)

Get and lock multiple keys using custom timeout

c.get("foo", "bar", lock: 3)

Parameters:

  • keys (String, Symbol, Array)

    One or several keys to fetch

  • options (Hash)

    Options for operation.

Returns:

  • (Object, Array, Hash, Libcouchbase::Result)

    the value(s)

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# File 'lib/libcouchbase/bucket.rb', line 88

def get(key, *keys, extended: false, async: false, quiet: @quiet, assemble_hash: false, **opts)
    was_array = key.respond_to?(:to_a) || keys.length > 0
    keys.unshift Array(key) # Convert enumerables
    keys.flatten!           # Ensure we're left with a list of keys

    if keys.length == 1
        promise = @connection.get(keys[0], **opts)

        unless extended
            promise = promise.then(proc { |resp|
                resp.value
            })
        end

        if quiet
            promise = promise.catch { |err|
                if err.is_a? Libcouchbase::Error::KeyNotFound
                    nil
                else
                    ::Libuv::Q.reject(@reactor, err)
                end
            }
        end

        if assemble_hash
            promise = promise.then(proc { |val|
                hash = defined?(::HashWithIndifferentAccess) ? ::HashWithIndifferentAccess.new : {}
                hash[keys[0]] = val
                hash
            })
        elsif was_array
            promise = promise.then(proc { |val|
                Array(val)
            })
        end

        result(promise, async)
    else
        promises = keys.collect { |key|
            @connection.get(key, **opts)
        }

        if quiet
            promises.map! { |prom|
                prom.catch { |err|
                    if err.is_a? Libcouchbase::Error::KeyNotFound
                        nil
                    else
                        ::Libuv::Q.reject(@reactor, err)
                    end
                }
            }
        end

        result(@reactor.all(*promises).then(proc { |results|
            if extended
                results.compact!
            else
                results.collect! { |resp| resp.value if resp }
            end

            if assemble_hash
                hash = defined?(::HashWithIndifferentAccess) ? ::HashWithIndifferentAccess.new : {}
                keys.each_with_index do |key, index|
                    hash[key] = results[index]
                end
                hash
            else
                results
            end
        }), async)
    end
end

#get_num_nodesInteger

The numbers of nodes in the cluster

Returns:

  • (Integer)


705
706
707
# File 'lib/libcouchbase/bucket.rb', line 705

def get_num_nodes
    result @connection.get_num_nodes
end

#get_num_replicasInteger

The numbers of the replicas for each node in the cluster

Returns:

  • (Integer)


699
700
701
# File 'lib/libcouchbase/bucket.rb', line 699

def get_num_replicas
    result @connection.get_num_replicas
end

#incr(key, by = 1, create: false, extended: false, async: false, **opts) ⇒ Integer

Increment the value of an existing numeric key

The increment method allow you to increase or decrease a given stored integer value. Updating the value of a key if it can be parsed to an integer. The update operation occurs on the server and is provided at the protocol level. This simplifies what would otherwise be a two-stage get and set operation.

Examples:

Increment key by one

c.incr(:foo)

Increment key by 50

c.incr("foo", 50)

Increment key by one OR initialize with zero

c.incr("foo", create: true)   #=> will return old+1 or 0

Increment key by one OR initialize with three

c.incr("foo", 50, initial: 3) #=> will return old+50 or 3

Increment key and get its CAS value

resp = c.incr("foo", :extended => true)
resp.cas   #=> 12345
resp.value #=> 2

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • by (Integer) (defaults to: 1)

    Integer (up to 64 bits) value to increment or decrement

  • options (Hash)

    Options for operation.

Returns:

  • (Integer)

    the actual value of the key.

Raises:

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists

  • (Libcouchbase::Error::DeltaBadval)

    if the key contains non-numeric value



450
451
452
453
454
455
456
457
458
# File 'lib/libcouchbase/bucket.rb', line 450

def incr(key, by = 1, create: false, extended: false, async: false, **opts)
    opts[:delta] ||= by
    opts[:initial] = 0 if create
    promise = @connection.counter(key, **opts)
    if not extended
        promise = promise.then { |resp| resp.value }
    end
    result promise, async
end

#n1ql(**options) ⇒ Libcouchbase::N1QL

Returns an n1ql query builder.

Returns:



601
602
603
# File 'lib/libcouchbase/bucket.rb', line 601

def n1ql(**options)
    N1QL.new(self, **options)
end

#prepend(key, value, async: false, **opts) ⇒ Libcouchbase::Result

Note:

This operation is kind of data-aware from server point of view. This mean that the server treats value as binary stream and just perform concatenation, therefore it won't work with :marshal and :document formats, because of lack of knowledge how to merge values in these formats.

Prepend this object to the existing object

Examples:

Simple prepend

c.set(:foo, "aaa", format: :plain)
c.prepend(:foo, "bbb")
c.get("foo")           #=> "bbbaaa"

Using optimistic locking. The operation will fail on CAS mismatch

resp = c.set("foo", "aaa", format: :plain)
c.prepend("foo", "bbb", cas: resp.cas)

Ensure that the key will be persisted at least on the one node

c.prepend("foo", "bar", persist_to: 1)

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • value (Object)

    Value to be appended

  • options (Hash)

    Options for operation.

Returns:

  • (Libcouchbase::Result)

    this includes the CAS value of the object.

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



397
398
399
# File 'lib/libcouchbase/bucket.rb', line 397

def prepend(key, value, async: false, **opts)
    result @connection.store(key, value, **PrependDefaults.merge(opts)), async
end

#replace(key, value, async: false, **opts) ⇒ Libcouchbase::Result

Replace the existing object in the database

Examples:

Store the key which will be expired in 2 seconds using relative TTL.

c.replace("foo", "bar", expire_in: 2)

Store the key which will be expired in 2 seconds using absolute TTL.

c.replace(:foo, :bar, expire_at: Time.now.to_i + 2)

Force JSON document format for value

c.replace("foo", {"bar" => "baz"}, format: :document)

Set application specific flags (note that it will be OR-ed with format flags)

c.replace("foo", "bar", flags: 0x1000)

Ensure that the key will be persisted at least on the one node

c.replace("foo", "bar", persist_to: 1)

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • value (Object)

    Value to be stored

  • options (Hash)

    Options for operation.

Returns:

  • (Libcouchbase::Result)

    this includes the CAS value of the object.

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



307
308
309
# File 'lib/libcouchbase/bucket.rb', line 307

def replace(key, value, async: false, **opts)
    result @connection.store(key, value, **ReplaceDefaults.merge(opts)), async
end

#save_design_doc(data, id = nil, async: false) ⇒ Object

Update or create design doc with supplied views

Parameters:

  • data (Hash, IO, String)

    The source object containing JSON encoded design document.

See Also:



611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
# File 'lib/libcouchbase/bucket.rb', line 611

def save_design_doc(data, id = nil, async: false)
    attrs = case data
    when String
        JSON.parse(data, Connection::DECODE_OPTIONS)
    when IO
        JSON.parse(data.read, Connection::DECODE_OPTIONS)
    when Hash
        data
    else
        raise ArgumentError, "Document should be Hash, String or IO instance"
    end
    attrs[:language] ||= :javascript

    id ||= attrs.delete(:_id)
    id = id.to_s.sub(/^_design\//, '')

    result @connection.http("/_design/#{id}",
        method: :put,
        body: attrs,
        type: :view
    ), async
end

#set(key, value, async: false, **opts) ⇒ Libcouchbase::Result Also known as: []=

Unconditionally store the object in the Couchbase

Examples:

Store the key which will be expired in 2 seconds using relative TTL.

c.set("foo", "bar", expire_in: 2)

Store the key which will be expired in 2 seconds using absolute TTL.

c.set(:foo, :bar, expire_at: Time.now.to_i + 2)

Force JSON document format for value

c.set("foo", {"bar" => "baz"}, format: :document)

Use hash-like syntax to store the value

c[:foo] = {bar: :baz}

Set application specific flags (note that it will be OR-ed with format flags)

c.set("foo", "bar", flags: 0x1000)

Perform optimistic locking by specifying last known CAS version

c.set("foo", "bar", cas: 8835713818674332672)

Ensure that the key will be persisted at least on the one node

c.set("foo", "bar", persist_to: 1)

Parameters:

  • key (String, Symbol)

    Key used to reference the value.

  • value (Object)

    Value to be stored

  • options (Hash)

    Options for operation.

Returns:

  • (Libcouchbase::Result)

    this includes the CAS value of the object.

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue



259
260
261
262
# File 'lib/libcouchbase/bucket.rb', line 259

def set(key, value, async: false, **opts)
    # default operation is set
    result @connection.store(key, value, **opts), async
end

#touch(async: false, **opts) ⇒ Object

Touch a key, changing its CAS and optionally setting a timeout



533
534
535
# File 'lib/libcouchbase/bucket.rb', line 533

def touch(async: false, **opts)
    result @connection.touch(**opts), async
end

#view(design, view, include_docs: true, is_spatial: false, **opts, &row_modifier) ⇒ Libcouchbase::Results

Returns an enumerable for the results in a view.

Results are lazily loaded when an operation is performed on the enum



549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
# File 'lib/libcouchbase/bucket.rb', line 549

def view(design, view, include_docs: true, is_spatial: false, **opts, &row_modifier)
    view = @connection.query_view(design, view, **ViewDefaults.merge(opts))
    view.include_docs = include_docs
    view.is_spatial = is_spatial

    current = ::Libuv::Reactor.current

    if current && current.running?
        ResultsLibuv.new(view, current, &row_modifier)
    elsif Object.const_defined?(:EventMachine) && EM.reactor_thread?
        ResultsEM.new(view, &row_modifier)
    else
        ResultsNative.new(view, &row_modifier)
    end
end

#wait_results(*results) ⇒ Array

Waits for all the async operations to complete and returns the results

Returns:

  • (Array)


712
713
714
# File 'lib/libcouchbase/bucket.rb', line 712

def wait_results(*results)
    result ::Libuv::Q.all(@reactor, *results.flatten)
end