Class: Couchbase::Bucket

Inherits:
Object
  • Object
show all
Includes:
Async, Operations
Defined in:
lib/couchbase/bucket.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Async

#async, #async=, #async?, #async_queue, #end_async_queue, #run, #run_async, #running, #running=, #running?

Methods included from Operations

included

Constructor Details

#initialize(url, options = {}) ⇒ Bucket #initialize(options = {}) ⇒ Bucket

Initialize new Bucket.

Examples:

Initialize connection using default options

Couchbase.new

Select custom bucket

Couchbase.new(:bucket => 'foo')
Couchbase.new('http://localhost:8091/pools/default/buckets/foo')

Connect to protected bucket

Couchbase.new(:bucket => 'protected', :username => 'protected', :password => 'secret')
Couchbase.new('http://localhost:8091/pools/default/buckets/protected',
              :username => 'protected', :password => 'secret')

Use list of nodes, in case some nodes might be dead

Couchbase.new(:node_list => ['example.com:8091', 'example.org:8091', 'example.net'])

Overloads:

  • #initialize(url, options = {}) ⇒ Bucket

    Initialize bucket using URI of the cluster and options. It is possible to override some parts of URI using the options keys (e.g. :host or :port)

    Parameters:

    • url (String)

      The full URL of management API of the cluster.

    • options (Hash) (defaults to: {})

      The options for connection. See options definition below.

  • #initialize(options = {}) ⇒ Bucket

    Initialize bucket using options only.

    Parameters:

    • options (Hash) (defaults to: {})

      The options for operation for connection

    Options Hash (options):

    • :node_list (Array) — default: nil

      the list of nodes to connect to. If specified it takes precedence over :host option. The list must be array of strings in form of host names or host names with ports (in first case port 8091 will be used, see examples).

    • :host (String) — default: "localhost"

      the hostname or IP address of the node

    • :port (Fixnum) — default: 8091

      the port of the managemenent API

    • :pool (String) — default: "default"

      the pool name

    • :bucket (String) — default: "default"

      the bucket name

    • :default_ttl (Fixnum) — default: 0

      the TTL used by default during storing key-value pairs.

    • :default_flags (Fixnum) — default: 0

      the default flags.

    • :default_format (Symbol) — default: :document

      the format, which will be used for values by default. Note that changing format will amend flags. (see #default_format)

    • :username (String) — default: nil

      the user name to connect to the cluster. Used to authenticate on management API. The username could be skipped for protected buckets, the bucket name will be used instead.

    • :password (String) — default: nil

      the password of the user.

    • :quiet (true, false) — default: false

      the flag controlling if raising exception when the client executes operations on non-existent keys. If it is true it will raise Error::NotFound exceptions. The default behaviour is to return nil value silently (might be useful in Rails cache).

    • :environment (Symbol) — default: :production

      the mode of the connection. Currently it influences only on design documents set. If the environment is :development, you will able to get design documents with ‘dev_’ prefix, otherwise (in :production mode) the library will hide them from you.

    • :key_prefix (String) — default: nil

      the prefix string which will be prepended to each key before sending out, and sripped before returning back to the application.

    • :timeout (Fixnum) — default: 2500000

      the timeout for IO operations (in microseconds)

    • :default_arithmetic_init (Fixnum, true) — default: 0

      the default initial value for arithmetic operations. Setting this option to any non positive number forces creation missing keys with given default value. Setting it to true will use zero as initial value. (see Bucket#incr and Bucket#decr).

    • :engine (Symbol) — default: :default

      the IO engine to use Currently following engines are supported:

      :default

      Built-in engine (multi-thread friendly)

      :libevent

      libevent IO plugin from libcouchbase (optional)

      :libev

      libev IO plugin from libcouchbase (optional)

      :eventmachine

      EventMachine plugin (builtin, but requires EM gem and ruby 1.9+)

    • :async (true, false) — default: false

      If true, the connection instance will be considered always asynchronous and IO interaction will be occured only when #run called. See #on_connect to hook your code after the instance will be connected.

Raises:

  • (Couchbase::Error::BucketNotFound)

    if there is no such bucket to connect to

  • (Couchbase::Error::Connect)

    if the socket wasn’t accessible (doesn’t accept connections or doesn’t respond in time)

Since:

  • 1.0.0



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# File 'lib/couchbase/bucket.rb', line 119

def initialize(url = nil, options = {})
  default_options = {
    type:                      nil,
    quiet:                     false,
    hostname:                  'localhost',
    port:                      8091,
    pool:                      'default',
    bucket:                    'default',
    password:                  '',
    engine:                    nil,
    default_ttl:               0,
    async:                     false,
    default_arithmetic_init:   0,
    default_flags:             0,
    default_format:            :document,
    default_observe_timeout:   2500000,
    on_error:                  nil,
    on_connect:                nil,
    timeout:                   0,
    environment:               nil,
    key_prefix:                nil,
    node_list:                 nil,
    destroying:                0,
    connected:                 0,
    on_connect_proc:           nil,
    async_disconnect_hook_set: 0,
    connected:                 false
  }

  url_options = if url.is_a? String
                  fail ArgumentError.new unless url =~ /^http:\/\//

                  uri = URI.new(url)

                  {
                    host: uri.host,
                    port: uri.port,
                  }.merge(path_to_pool_and_bucket(uri.path))
                elsif url.nil?
                  {}
                else
                  url
                end

  connection_options = default_options.merge(options).merge(url_options)

  connection_options.each_pair do |key, value|
    instance_variable_set("@#{key}", value)
  end

  @transcoders = {
    document: Transcoder::Document.new,
    marshal:  Transcoder::Marshal.new,
    plain:    Transcoder::Plain.new
  }

  @transcoder = @transcoders[@default_format]

  connect unless async?
end

Instance Attribute Details

#bucketObject

Returns the value of attribute bucket.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def bucket
  @bucket
end

#clientObject (readonly)

Returns the value of attribute client.



25
26
27
# File 'lib/couchbase/bucket.rb', line 25

def client
  @client
end

#default_arithmetic_initObject

Returns the value of attribute default_arithmetic_init.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def default_arithmetic_init
  @default_arithmetic_init
end

#default_formatObject (readonly)

Returns the value of attribute default_format.



25
26
27
# File 'lib/couchbase/bucket.rb', line 25

def default_format
  @default_format
end

#default_ttlObject

Returns the value of attribute default_ttl.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def default_ttl
  @default_ttl
end

#hostnameObject

Returns the value of attribute hostname.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def hostname
  @hostname
end

#key_prefixObject (readonly)

Returns the value of attribute key_prefix.



25
26
27
# File 'lib/couchbase/bucket.rb', line 25

def key_prefix
  @key_prefix
end

#passwordObject

Returns the value of attribute password.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def password
  @password
end

#poolObject

Returns the value of attribute pool.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def pool
  @pool
end

#portObject

Returns the value of attribute port.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def port
  @port
end

#quietObject

Returns the value of attribute quiet.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def quiet
  @quiet
end

#timeoutObject

Returns the value of attribute timeout.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def timeout
  @timeout
end

#transcoderObject

Returns the value of attribute transcoder.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def transcoder
  @transcoder
end

#usernameObject

Returns the value of attribute username.



21
22
23
# File 'lib/couchbase/bucket.rb', line 21

def username
  @username
end

Instance Method Details

#authorityObject



211
212
213
# File 'lib/couchbase/bucket.rb', line 211

def authority
  "#{hostname}:#{port}"
end

#base_urlObject



215
216
217
# File 'lib/couchbase/bucket.rb', line 215

def base_url
  "http://#{authority}/pools"
end

#cas(key, options = {}) {|value| ... } ⇒ Fixnum Also known as: compare_and_swap

Compare and swap value.

Reads a key’s value from the server and yields it to a block. Replaces the key’s value with the result of the block as long as the key hasn’t been updated in the meantime, otherwise raises Error::KeyExists. CAS stands for “compare and swap”, and avoids the need for manual key mutexing. Read more info here:

In asynchronous mode it will yield result twice, first for Bucket#get with Result#operation equal to :get and second time for Bucket#set with Result#operation equal to :set.

Examples:

Implement append to JSON encoded value


c.default_format = :document
c.set("foo", {"bar" => 1})
c.cas("foo") do |val|
  val["baz"] = 2
  val
end
c.get("foo")      #=> {"bar" => 1, "baz" => 2}

Append JSON encoded value asynchronously


c.default_format = :document
c.set("foo", {"bar" => 1})
c.run do
  c.cas("foo") do |val|
    case val.operation
    when :get
      val["baz"] = 2
      val
    when :set
      # verify all is ok
      puts "error: #{ret.error.inspect}" unless ret.success?
    end
  end
end
c.get("foo")      #=> {"bar" => 1, "baz" => 2}

Parameters:

  • key (String, Symbol)
  • options (Hash) (defaults to: {})

    the options for “swap” part

Options Hash (options):

  • :ttl (Fixnum) — default: self.default_ttl

    the time to live of this key

  • :format (Symbol) — default: self.default_format

    format of the value

  • :flags (Fixnum) — default: self.default_flags

    flags for this key

Yield Parameters:

  • value (Object, Result)

    old value in synchronous mode and Result object in asynchronous mode.

Yield Returns:

  • (Object)

    new value.

Returns:

  • (Fixnum)

    the CAS of new value

Raises:

  • (Couchbase::Error::KeyExists)

    if the key was updated before the the code in block has been completed (the CAS value has been changed).

  • (ArgumentError)

    if the block is missing for async mode

See Also:

Since:

  • 1.0.0



314
315
316
317
318
319
320
321
322
323
324
325
326
# File 'lib/couchbase/bucket.rb', line 314

def cas(key, options = {})
  if async?
    block = Proc.new
    get(key) do |ret|
      val = block.call(ret) # get new value from caller
      set(ret.key, val, options.merge(:cas => ret.cas, &block))
    end
  else
    val, flags, ver = get(key, :extended => true)
    val = yield(val) # get new value from caller
    set(key, val, options.merge(:cas => ver))
  end
end

#connectObject Also known as: reconnect



188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/couchbase/bucket.rb', line 188

def connect
  uris = if @node_list
           Array(@node_list).map { |n| URI.new(n) }
         else
           Array(URI.new(base_url))
         end

  begin
    builder = CouchbaseConnectionFactoryBuilder.new
    builder.setTranscoder(@transcoder)
    connection_factory = builder.buildCouchbaseConnection(uris, bucket.to_java_string, password.to_java_string)
    @client = CouchbaseClient.new(connection_factory)
    @connected = true
  rescue Java::ComCouchbaseClientVbucket::ConfigurationException
    fail Couchbase::Error::Auth
  rescue java.net.ConnectException => e
    fail Couchbase::Error::Connect
  end

  self
end

#connected?Boolean

Returns:

  • (Boolean)


223
224
225
# File 'lib/couchbase/bucket.rb', line 223

def connected?
  @connected
end

#create_periodic_timer(interval, &block) ⇒ Couchbase::Timer

Create and register periodic timer

Returns:

  • (Couchbase::Timer)


380
381
382
# File 'lib/couchbase/bucket.rb', line 380

def create_periodic_timer(interval, &block)
  Timer.new(self, interval, :periodic => true, &block)
end

#create_timer(interval, &block) ⇒ Couchbase::Timer

Create and register one-shot timer

Returns:

  • (Couchbase::Timer)


373
374
375
# File 'lib/couchbase/bucket.rb', line 373

def create_timer(interval, &block)
  Timer.new(self, interval, &block)
end

#disconnectObject



227
228
229
230
231
232
233
234
235
236
# File 'lib/couchbase/bucket.rb', line 227

def disconnect
  if connected?
    @client.shutdown(3, TimeUnit::SECONDS)
    @client = nil
    @connection_factory = nil
    @connected = false
  else
    fail Couchbase::Error::Connect
  end
end

#flush {|ret| ... } ⇒ true

Delete contents of the bucket

Examples:

Simple flush the bucket

c.flush    #=> true

Asynchronous flush

c.run do
  c.flush do |ret|
    ret.operation   #=> :flush
    ret.success?    #=> true
    ret.status      #=> 200
  end
end

Yield Parameters:

  • ret (Result)

    the object with error, status and operation attributes.

Returns:

  • (true)

    always return true (see raise section)

Raises:

  • (Couchbase::Error::Protocol)

    in case of an error is encountered. Check Error::Base#status for detailed code.

See Also:

Since:

  • 1.2.0.beta



354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# File 'lib/couchbase/bucket.rb', line 354

def flush
  if !async? && block_given?
    sync_block_error
  end
  req = make_http_request("/pools/default/buckets/#{bucket}/controller/doFlush",
                          :type => :management, :method => :post, :extended => true)
  res = nil
  req.on_body do |r|
    res = r
    res.instance_variable_set("@operation", :flush)
    yield(res) if block_given?
  end
  req.continue
  true
end

#hostObject



184
185
186
# File 'lib/couchbase/bucket.rb', line 184

def host
  hostname
end

#observe_and_wait(*keys, &block) ⇒ Fixnum, Hash<String, Fixnum>

Wait for persistence condition

This operation is useful when some confidence needed regarding the state of the keys. With two parameters :replicated and :persisted it allows to set up the waiting rule.

Parameters:

  • keys (String, Symbol, Array, Hash)

    The list of the keys to observe. Full form is hash with key-cas value pairs, but there are also shortcuts like just Array of keys or single key. CAS value needed to when you need to ensure that the storage persisted exactly the same version of the key you are asking to observe.

  • options (Hash)

    The options for operation

Returns:

  • (Fixnum, Hash<String, Fixnum>)

    will return CAS value just like mutators or pairs key-cas in case of multiple keys.

Raises:

  • (Couchbase::Error::Timeout)

    if the given time is up

Since:

  • 1.2.0.dp6



408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
# File 'lib/couchbase/bucket.rb', line 408

def observe_and_wait(*keys, &block)
  options = {:timeout => default_observe_timeout}
  options.update(keys.pop) if keys.size > 1 && keys.last.is_a?(Hash)
  verify_observe_options(options)
  if block && !async?
    raise ArgumentError, "synchronous mode doesn't support callbacks"
  end
  if keys.size == 0
    raise ArgumentError, "at least one key is required"
  end
  if keys.size == 1 && keys[0].is_a?(Hash)
    key_cas = keys[0]
  else
    key_cas = keys.flatten.reduce({}) do |h, kk|
      h[kk] = nil   # set CAS to nil
      h
    end
  end
  if async?
    do_observe_and_wait(key_cas, options, &block)
  else
    res = do_observe_and_wait(key_cas, options, &block) while res.nil?
    unless async?
      if keys.size == 1 && (keys[0].is_a?(String) || keys[0].is_a?(Symbol))
        return res.values.first
      else
        return res
      end
    end
  end
end

#on_connect(&block) ⇒ Object



238
239
240
# File 'lib/couchbase/bucket.rb', line 238

def on_connect(&block)
  @on_connect = block
end

#on_error(&block) ⇒ Object



242
243
244
# File 'lib/couchbase/bucket.rb', line 242

def on_error(&block)
  @on_error = block
end

#quiet?Boolean

Returns:

  • (Boolean)


180
181
182
# File 'lib/couchbase/bucket.rb', line 180

def quiet?
  !!quiet
end

#urlObject



219
220
221
# File 'lib/couchbase/bucket.rb', line 219

def url
  "http://#{authority}/pools/#{pool}/buckets/#{bucket}/"
end

#versionObject



246
247
248
249
250
251
252
# File 'lib/couchbase/bucket.rb', line 246

def version
  {}.tap do |hash|
    @client.getVersions.to_hash.each_pair do |ip, ver|
      hash[ip.to_s] = ver
    end
  end
end