Class: Libcouchbase::Bucket

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/libcouchbase/bucket.rb

Constant Summary collapse

AddDefaults =
{operation: :add}.freeze
ReplaceDefaults =
{operation: :replace}.freeze
AppendDefaults =
{operation: :append}.freeze
PrependDefaults =
{operation: :prepend}.freeze
ViewDefaults =
{
    on_error: :stop,
    stale: false
}
FtsDefaults =
{
    include_docs: true,
    size: 10000, # Max result size
    from: 0,
    explain: false
}

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(**options) ⇒ Bucket



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/libcouchbase/bucket.rb', line 17

def initialize(**options)
    @connection = Connection.new(**options)
    connect

    # This obtains the connections reactor
    @reactor = reactor
    @quiet = true

    # clean up the connection once this object is garbage collected
    ObjectSpace.define_finalizer( self, self.class.finalize(@connection) )
end

Instance Attribute Details

#connectionObject (readonly)

Returns the value of attribute connection.



30
31
32
# File 'lib/libcouchbase/bucket.rb', line 30

def connection
  @connection
end

#quietObject

Returns the value of attribute quiet.



31
32
33
# File 'lib/libcouchbase/bucket.rb', line 31

def quiet
  @quiet
end

Class Method Details

.finalize(connection) ⇒ Object



13
14
15
# File 'lib/libcouchbase/bucket.rb', line 13

def self.finalize(connection)
    proc { connection.destroy }
end

Instance Method Details

#add(key, value, async: false, **opts) ⇒ Libcouchbase::Result

Add the item to the database, but fail if the object exists already

Examples:

Store the key which will be expired in 2 seconds using relative TTL.

c.add("foo", "bar", expire_in: 2)

Store the key which will be expired in 2 seconds using absolute TTL.

c.add(:foo, :bar, expire_at: Time.now.to_i + 2)

Force JSON document format for value

c.add("foo", {"bar" => "baz"}, format: :document)

Set application specific flags (note that it will be OR-ed with format flags)

c.add("foo", "bar", flags: 0x1000)

Ensure that the key will be persisted at least on the one node

c.add("foo", "bar", persist_to: 1)

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue



191
192
193
# File 'lib/libcouchbase/bucket.rb', line 191

def add(key, value, async: false, **opts)
    result @connection.store(key, value, **AddDefaults.merge(opts)), async
end

#append(key, value, async: false, **opts) ⇒ Libcouchbase::Result

Note:

This operation is kind of data-aware from server point of view. This mean that the server treats value as binary stream and just perform concatenation, therefore it won't work with :marshal and :document formats, because of lack of knowledge how to merge values in these formats.

Append this object to the existing object

Examples:

Simple append

c.set(:foo, "aaa", format: :plain)
c.append(:foo, "bbb")
c.get("foo")           #=> "aaabbb"

Using optimistic locking. The operation will fail on CAS mismatch

resp = c.set("foo", "aaa", format: :plain)
c.append("foo", "bbb", cas: resp.cas)

Ensure that the key will be persisted at least on the one node

c.append("foo", "bar", persist_to: 1)

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



336
337
338
# File 'lib/libcouchbase/bucket.rb', line 336

def append(key, value, async: false, **opts)
    result @connection.store(key, value, **AppendDefaults.merge(opts)), async
end

#compare_and_swap(key, **opts) {|value| ... } ⇒ Libcouchbase::Response Also known as: cas

Compare and swap value.

Reads a key’s value from the server and yields it to a block. Replaces the key’s value with the result of the block as long as the key hasn’t been updated in the meantime, otherwise raises Error::KeyExists.

Setting the :retry option to a positive number will cause this method to rescue the Error::KeyExists error that happens when an update collision is detected, and automatically get a fresh copy of the value and retry the block. This will repeat as long as there continues to be conflicts, up to the maximum number of retries specified.

Examples:

Implement append to JSON encoded value


c.set(:foo, {bar: 1})
c.cas(:foo) do |val|
  val[:baz] = 2
  val
end
c.get(:foo)      #=> {bar: 1, baz: 2}

Yield Parameters:

  • value (Object)

    existing value

Yield Returns:

  • (Object)

    new value.

Raises:

  • (Couchbase::Error::KeyExists)

    if the key was updated before the the code in block has been completed (the CAS value has been changed).

  • (ArgumentError)

    if the block is missing



665
666
667
668
669
670
671
672
673
674
675
676
677
678
# File 'lib/libcouchbase/bucket.rb', line 665

def compare_and_swap(key, **opts)
    retries = opts.delete(:retry) || 0
    begin
        current = result(@connection.get(key))
        new_value = yield current.value, opts
        opts[:cas] = current.cas

        set(key, new_value, **opts)
    rescue Libcouchbase::Error::KeyExists
        retries -= 1
        retry if retries >= 0
        raise
    end
end

#decr(key, by = 1, **opts) ⇒ Object

Decrement the value of an existing numeric key

Helper method, see incr



447
448
449
# File 'lib/libcouchbase/bucket.rb', line 447

def decr(key, by = 1, **opts)
    incr(key, -by, **opts)
end

#delete(key, async: false, quiet: @quiet, **opts) ⇒ true, false

Delete the specified key

Examples:

Delete the key in quiet mode (default)

c.set("foo", "bar")
c.delete("foo")        #=> true
c.delete("foo")        #=> false

Delete the key verbosely

c.set("foo", "bar")
c.delete("foo", quiet: false)   #=> true
c.delete("foo", quiet: true)    #=> nil (default behaviour)
c.delete("foo", quiet: false)   #=> will raise Libcouchbase::Error::KeyNotFound

Delete the key with version check

res = c.set("foo", "bar")       #=> #<struct Libcouchbase::Response callback=:callback_set, key="foo", cas=1975457268957184, value="bar", metadata={:format=>:document, :flags=>0}>
c.delete("foo", cas: 123456)    #=> will raise Libcouchbase::Error::KeyExists
c.delete("foo", cas: res.cas)   #=> true

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



487
488
489
490
491
492
493
494
495
496
497
498
499
# File 'lib/libcouchbase/bucket.rb', line 487

def delete(key, async: false, quiet: @quiet, **opts)
    promise = @connection.remove(key, **opts).then { true }
    if quiet
        promise = promise.catch { |error|
            if error.is_a? Libcouchbase::Error::KeyNotFound
                false
            else
                ::Libuv::Q.reject(@reactor, error)
            end
        }
    end
    result promise, async
end

#delete_design_doc(id, rev = nil, async: false) ⇒ Object

Delete design doc with given id and optional revision.



624
625
626
627
628
# File 'lib/libcouchbase/bucket.rb', line 624

def delete_design_doc(id, rev = nil, async: false)
    id = id.to_s.sub(/^_design\//, '')
    rev = "?rev=#{rev}" if rev
    result @connection.http("/_design/#{id}#{rev}", method: :delete, type: :view), async
end

#design_docs(**opts) ⇒ Libcouchbase::DesignDocs

Fetch design docs stored in current bucket



524
525
526
# File 'lib/libcouchbase/bucket.rb', line 524

def design_docs(**opts)
    DesignDocs.new(self, @connection, method(:result), **opts)
end

#flush(async: false) ⇒ Libcouchbase::Response

Delete contents of the bucket

Examples:

Simple flush the bucket

c.flush

Raises:

  • (Libcouchbase::Error::HttpError)

    in case of an error is encountered.

See Also:



512
513
514
# File 'lib/libcouchbase/bucket.rb', line 512

def flush(async: false)
    result @connection.flush, async
end

#full_text_search(index, query, **opts, &row_modifier) ⇒ Libcouchbase::Results

Returns an enumerable for the results in a full text search.

Results are lazily loaded when an operation is performed on the enum



558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
# File 'lib/libcouchbase/bucket.rb', line 558

def full_text_search(index, query, **opts, &row_modifier)
    if query.is_a? Hash
        opts[:query] = query
    else
        opts[:query] = {query: query}
    end
    fts = @connection.full_text_search(index, **FtsDefaults.merge(opts))

    current = ::Libuv::Reactor.current
    if current && current.running?
        ResultsLibuv.new(fts, current, &row_modifier)
    elsif Object.const_defined?(:EventMachine) && EM.reactor_thread?
        ResultsEM.new(fts, &row_modifier)
    else
        ResultsNative.new(fts, &row_modifier)
    end
end

#get(*keys, extended: false, async: false, quiet: @quiet, assemble_hash: false, **opts) ⇒ Object, ... Also known as: []

Obtain an object stored in Couchbase by given key.

Examples:

Get single value in quiet mode (the default)

c.get("foo")     #=> the associated value or nil

Use alternative hash-like syntax

c["foo"]         #=> the associated value or nil

Get single value in verbose mode

c.get("missing-foo", quiet: false)  #=> raises Libcouchbase::Error::NotFound

Get multiple keys

c.get("foo", "bar", "baz")   #=> [val1, val2, val3]

Get multiple keys with assembing result into the Hash

c.get("foo", "bar", "baz", assemble_hash: true)
#=> {"foo" => val1, "bar" => val2, "baz" => val3}

Get and lock key using default timeout

c.get("foo", lock: true)  # This locks for the maximum time of 30 seconds

Get and lock key using custom timeout

c.get("foo", lock: 3)

Get and lock multiple keys using custom timeout

c.get("foo", "bar", lock: 3)

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/libcouchbase/bucket.rb', line 83

def get(*keys, extended: false, async: false, quiet: @quiet, assemble_hash: false, **opts)
    keys = keys.flatten

    if keys.length == 1
        promise = @connection.get(keys[0], **opts)

        unless extended
            promise = promise.then(proc { |resp|
                resp.value
            })
        end

        if quiet
            promise = promise.catch { |err|
                if err.is_a? Libcouchbase::Error::KeyNotFound
                    nil
                else
                    ::Libuv::Q.reject(@reactor, err)
                end
            }
        end

        if assemble_hash
            promise = promise.then(proc { |val|
                hash = defined?(::HashWithIndifferentAccess) ? ::HashWithIndifferentAccess.new : {}
                hash[keys[0]] = val
                hash
            })
        end

        result(promise, async)
    else
        promises = keys.collect { |key|
            @connection.get(key, **opts)
        }

        if quiet
            promises.map! { |prom|
                prom.catch { |err|
                    if err.is_a? Libcouchbase::Error::KeyNotFound
                        nil
                    else
                        ::Libuv::Q.reject(@reactor, err)
                    end
                }
            }
        end

        result(@reactor.all(*promises).then(proc { |results|
            if not extended
                # Check if resp nil as might have been a quiet request
                results.collect! { |resp| resp.value if resp }
            end

            if assemble_hash
                hash = defined?(::HashWithIndifferentAccess) ? ::HashWithIndifferentAccess.new : {}
                keys.each_with_index do |key, index|
                    hash[key] = results[index]
                end
                hash
            else
                results
            end
        }), async)
    end
end

#get_num_nodesInteger

The numbers of nodes in the cluster



689
690
691
# File 'lib/libcouchbase/bucket.rb', line 689

def get_num_nodes
    result @connection.get_num_nodes
end

#get_num_replicasInteger

The numbers of the replicas for each node in the cluster



683
684
685
# File 'lib/libcouchbase/bucket.rb', line 683

def get_num_replicas
    result @connection.get_num_replicas
end

#incr(key, by = 1, create: false, extended: false, async: false, **opts) ⇒ Integer

Increment the value of an existing numeric key

The increment method allow you to increase or decrease a given stored integer value. Updating the value of a key if it can be parsed to an integer. The update operation occurs on the server and is provided at the protocol level. This simplifies what would otherwise be a two-stage get and set operation.

Examples:

Increment key by one

c.incr(:foo)

Increment key by 50

c.incr("foo", 50)

Increment key by one OR initialize with zero

c.incr("foo", create: true)   #=> will return old+1 or 0

Increment key by one OR initialize with three

c.incr("foo", 50, initial: 3) #=> will return old+50 or 3

Increment key and get its CAS value

resp = c.incr("foo", :extended => true)
resp.cas   #=> 12345
resp.value #=> 2

Raises:

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists

  • (Libcouchbase::Error::DeltaBadval)

    if the key contains non-numeric value



434
435
436
437
438
439
440
441
442
# File 'lib/libcouchbase/bucket.rb', line 434

def incr(key, by = 1, create: false, extended: false, async: false, **opts)
    opts[:delta] ||= by
    opts[:initial] = 0 if create
    promise = @connection.counter(key, **opts)
    if not extended
        promise = promise.then { |resp| resp.value }
    end
    result promise, async
end

#n1ql(**options) ⇒ Libcouchbase::N1QL

Returns an n1ql query builder.



585
586
587
# File 'lib/libcouchbase/bucket.rb', line 585

def n1ql(**options)
    N1QL.new(self, **options)
end

#prepend(key, value, async: false, **opts) ⇒ Libcouchbase::Result

Note:

This operation is kind of data-aware from server point of view. This mean that the server treats value as binary stream and just perform concatenation, therefore it won't work with :marshal and :document formats, because of lack of knowledge how to merge values in these formats.

Prepend this object to the existing object

Examples:

Simple prepend

c.set(:foo, "aaa", format: :plain)
c.prepend(:foo, "bbb")
c.get("foo")           #=> "bbbaaa"

Using optimistic locking. The operation will fail on CAS mismatch

resp = c.set("foo", "aaa", format: :plain)
c.prepend("foo", "bbb", cas: resp.cas)

Ensure that the key will be persisted at least on the one node

c.prepend("foo", "bar", persist_to: 1)

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



381
382
383
# File 'lib/libcouchbase/bucket.rb', line 381

def prepend(key, value, async: false, **opts)
    result @connection.store(key, value, **PrependDefaults.merge(opts)), async
end

#replace(key, value, async: false, **opts) ⇒ Libcouchbase::Result

Replace the existing object in the database

Examples:

Store the key which will be expired in 2 seconds using relative TTL.

c.replace("foo", "bar", expire_in: 2)

Store the key which will be expired in 2 seconds using absolute TTL.

c.replace(:foo, :bar, expire_at: Time.now.to_i + 2)

Force JSON document format for value

c.replace("foo", {"bar" => "baz"}, format: :document)

Set application specific flags (note that it will be OR-ed with format flags)

c.replace("foo", "bar", flags: 0x1000)

Ensure that the key will be persisted at least on the one node

c.replace("foo", "bar", persist_to: 1)

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue

  • (Libcouchbase::Error::KeyNotFound)

    if the key doesn't exists



291
292
293
# File 'lib/libcouchbase/bucket.rb', line 291

def replace(key, value, async: false, **opts)
    result @connection.store(key, value, **ReplaceDefaults.merge(opts)), async
end

#save_design_doc(data, id = nil, async: false) ⇒ Object

Update or create design doc with supplied views



595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
# File 'lib/libcouchbase/bucket.rb', line 595

def save_design_doc(data, id = nil, async: false)
    attrs = case data
    when String
        JSON.parse(data, Connection::DECODE_OPTIONS)
    when IO
        JSON.parse(data.read, Connection::DECODE_OPTIONS)
    when Hash
        data
    else
        raise ArgumentError, "Document should be Hash, String or IO instance"
    end
    attrs[:language] ||= :javascript

    id ||= attrs.delete(:_id)
    id = id.to_s.sub(/^_design\//, '')

    result @connection.http("/_design/#{id}",
        method: :put,
        body: attrs,
        type: :view
    ), async
end

#set(key, value, async: false, **opts) ⇒ Libcouchbase::Result Also known as: []=

Unconditionally store the object in the Couchbase

Examples:

Store the key which will be expired in 2 seconds using relative TTL.

c.set("foo", "bar", expire_in: 2)

Store the key which will be expired in 2 seconds using absolute TTL.

c.set(:foo, :bar, expire_at: Time.now.to_i + 2)

Force JSON document format for value

c.set("foo", {"bar" => "baz"}, format: :document)

Use hash-like syntax to store the value

c[:foo] = {bar: :baz}

Set application specific flags (note that it will be OR-ed with format flags)

c.set("foo", "bar", flags: 0x1000)

Perform optimistic locking by specifying last known CAS version

c.set("foo", "bar", cas: 8835713818674332672)

Ensure that the key will be persisted at least on the one node

c.set("foo", "bar", persist_to: 1)

Raises:

  • (Libcouchbase::Error::KeyExists)

    if the key already exists on the server with a different CAS value to that provided

  • (Libouchbase::Error::Timedout)

    if timeout interval for observe exceeds

  • (Libouchbase::Error::NetworkError)

    if there was a communication issue



243
244
245
246
# File 'lib/libcouchbase/bucket.rb', line 243

def set(key, value, async: false, **opts)
    # default operation is set
    result @connection.store(key, value, **opts), async
end

#touch(async: false, **opts) ⇒ Object

Touch a key, changing its CAS and optionally setting a timeout



517
518
519
# File 'lib/libcouchbase/bucket.rb', line 517

def touch(async: false, **opts)
    result @connection.touch(**opts), async
end

#view(design, view, include_docs: true, is_spatial: false, **opts, &row_modifier) ⇒ Libcouchbase::Results

Returns an enumerable for the results in a view.

Results are lazily loaded when an operation is performed on the enum



533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
# File 'lib/libcouchbase/bucket.rb', line 533

def view(design, view, include_docs: true, is_spatial: false, **opts, &row_modifier)
    view = @connection.query_view(design, view, **ViewDefaults.merge(opts))
    view.include_docs = include_docs
    view.is_spatial = is_spatial

    current = ::Libuv::Reactor.current

    if current && current.running?
        ResultsLibuv.new(view, current, &row_modifier)
    elsif Object.const_defined?(:EventMachine) && EM.reactor_thread?
        ResultsEM.new(view, &row_modifier)
    else
        ResultsNative.new(view, &row_modifier)
    end
end

#wait_results(*results) ⇒ Array

Waits for all the async operations to complete and returns the results



696
697
698
# File 'lib/libcouchbase/bucket.rb', line 696

def wait_results(*results)
    result ::Libuv::Q.all(@reactor, *results.flatten)
end