Class: ZooKeeper::Client

Inherits:
Object
  • Object
show all
Includes:
Operations
Defined in:
lib/zkruby/client.rb,
lib/zkruby/util.rb

Overview

Client API

All calls operate asynchronously or synchronously based on whether a block is supplied

Without a block, requests are executed synchronously and either return results directly or raise a Error

With a block, the request returns immediately with a AsyncOp. When the server responds the block is passed the results. Errors will be sent to an error callback if registered on the AsyncOp

Requests that take a watch argument can be passed either…

* An object that quacks like a {Watcher} 
* A Proc will be invoked with arguments state, path, event
* The literal value "true" refers to the default watcher registered with the session

Registered watches will be fired exactly once for a given path with either the expected event or with state :expired and event :none when the session is finalised

Constant Summary

Constants included from Operations

Operations::CREATE_OPTS

Instance Method Summary collapse

Constructor Details

#initialize(binding) ⇒ Client

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

See ZooKeeper.connect



263
264
265
# File 'lib/zkruby/client.rb', line 263

def initialize(binding)
    @binding = binding
end

Instance Method Details

#children(path, watch = nil) ⇒ Data::Stat, Array<String> #children(path, watch = nil) {|stat, children| ... } ⇒ AsyncOp

Retrieve the list of children at the given path

Overloads:

  • #children(path, watch = nil) ⇒ Data::Stat, Array<String>

    Returns stat,children stat of path and the list of child nodes.

    Parameters:

    • path (String)
    • if (Watcher)

      supplied sets a child watch on the given path

    Returns:

    • (Data::Stat, Array<String>)

      stat,children stat of path and the list of child nodes

    Raises:

  • #children(path, watch = nil) {|stat, children| ... } ⇒ AsyncOp

    Returns asynchronous operation.

    Yield Parameters:

    • stat (Data::Stat)

      current stat of path

    • children (Array<String>)

      the list of child nodes at path

    Returns:

    • (AsyncOp)

      asynchronous operation



295
296
297
298
299
300
301
302
303
# File 'lib/zkruby/client.rb', line 295

def children(path,watch=nil,&callback)
    return synchronous_call(:children,path,watch) unless block_given?
    path = chroot(path) 

    req = Proto::GetChildren2Request.new(:path => path, :watch => watch)
    queue_request(req,:get_children2,12,Proto::GetChildren2Response,:children,watch) do | response |
        callback.call(response.stat, response.children.to_a)
    end
end

#closeObject #close { ... } ⇒ AsyncOp

Close the session

Overloads:

  • #closeObject

    Raises:

  • #close { ... } ⇒ AsyncOp

    Returns asynchronous operation.

    Yields:

    • callback invoked when session is closed

    Returns:

    • (AsyncOp)

      asynchronous operation



452
453
454
455
# File 'lib/zkruby/client.rb', line 452

def close(&blk)
    return synchronous_call(:close) unless block_given?
    @binding.close(&blk)
end

#create(path, data, acl, *modeopts) ⇒ String #create(path, data, acl, *modeopts) {|path| ... } ⇒ AsyncOp

Create a node

Overloads:

  • #create(path, data, acl, *modeopts) ⇒ String

    Synchronous style

    Parameters:

    • path (String)

      the base name of the path to create

    • data (String)

      the content to store at path

    • acl (Data::ACL)

      the access control list to apply to the new node

    • modeopts (Symbol, ...)

      combination of :sequential, :emphemeral

    Returns:

    • (String)

      the created path, only different if :sequential is requested

    Raises:

  • #create(path, data, acl, *modeopts) {|path| ... } ⇒ AsyncOp

    Returns asynchronous operation.

    Yield Parameters:

    • path (String)

      the created path

    Returns:

    • (AsyncOp)

      asynchronous operation



317
318
319
# File 'lib/zkruby/client.rb', line 317

def create(path,data,acl,*modeopts,&callback)
    op_create(path,data,acl,*modeopts,&callback)
end

#delete(path, version) ⇒ Object #delete(path, version) { ... } ⇒ AsyncOp

Delete path

Overloads:

  • #delete(path, version) ⇒ Object

    Parameters:

    • path (String)
    • version (FixNum)

      the expected version to be deleted (-1 to match any version)

    Returns:

    Raises:

  • #delete(path, version) { ... } ⇒ AsyncOp

    Yields:

    • callback invoked if delete is successful

    Returns:



372
373
374
# File 'lib/zkruby/client.rb', line 372

def delete(path,version,&callback)
    op_delete(path,version,&callback)
end

#exists(path, watch = nil) ⇒ Data::Stat #exists(path, watch = nil) {|stat| ... } ⇒ AsyncOp Also known as: exists?, stat

Retrieve the Data::Stat of a path, or nil if the path does not exist

Overloads:

  • #exists(path, watch = nil) ⇒ Data::Stat

    Returns Stat of the path or nil if the path does not exist.

    Parameters:

    • path (String)
    • wath (Watcher)

      optional exists watch to set on this path

    Returns:

    • (Data::Stat)

      Stat of the path or nil if the path does not exist

    Raises:

  • #exists(path, watch = nil) {|stat| ... } ⇒ AsyncOp

    Returns asynchronous operation.

    Yield Parameters:

    • stat (Data:Stat)

      Stat of the path or nil if the path did not exist

    Returns:

    • (AsyncOp)

      asynchronous operation



351
352
353
354
355
356
357
358
359
# File 'lib/zkruby/client.rb', line 351

def exists(path,watch=nil,&blk)
    return synchronous_call(:exists,path,watch)[0] unless block_given?
    path = chroot(path) 

    req = Proto::ExistsRequest.new(:path => path, :watch => watch)
    queue_request(req,:exists,3,Proto::ExistsResponse,:exists,watch,ExistsPacket) do | response |
        blk.call( response.nil? ? nil : response.stat )
    end
end

#get(path, watch = nil) ⇒ Data::Stat, String #get(path, watch = nil) {|stat, data| ... } ⇒ AsyncOp

Retrieve data

Overloads:

  • #get(path, watch = nil) ⇒ Data::Stat, String

    Returns stat,data at path.

    Parameters:

    • path (String)
    • watch (Watcher) (defaults to: nil)

      optional data watch to set on this path

    Returns:

    Raises:

  • #get(path, watch = nil) {|stat, data| ... } ⇒ AsyncOp

    Returns asynchronous operation.

    Yield Parameters:

    • stat (Data::Stat)

      Stat of the path

    • data (String)

      Content at path

    Returns:

    • (AsyncOp)

      asynchronous operation



331
332
333
334
335
336
337
338
339
340
# File 'lib/zkruby/client.rb', line 331

def get(path,watch=nil,&blk)
    return synchronous_call(:get,path,watch) unless block_given?
    path = chroot(path) 

    req = Proto::GetDataRequest.new(:path => path, :watch => watch)

    queue_request(req,:get,4,Proto::GetDataResponse,:data,watch) do | response |
        blk.call( response.stat, response.data.to_s)
    end
end

#get_acl(path) ⇒ Array<Data::ACL> #get_acl(path) {|list| ... } ⇒ AsyncOp

Get ACl

Overloads:

  • #get_acl(path) ⇒ Array<Data::ACL>

    Returns list of acls applying to path.

    Parameters:

    • path (String)

    Returns:

    • (Array<Data::ACL>)

      list of acls applying to path

    Raises:

  • #get_acl(path) {|list| ... } ⇒ AsyncOp

    Returns asynchronous operation.

    Yield Parameters:

    • list (Array<Data::ACL>)

      of acls applying to path

    Returns:

    • (AsyncOp)

      asynchronous operation



398
399
400
401
402
403
404
405
406
# File 'lib/zkruby/client.rb', line 398

def get_acl(path,&blk)
    return synchronous_call(:get_acl,path)[0] unless block_given?
    path = chroot(path) 

    req = Proto::GetACLRequest.new(:path => path)
    queue_request(req,:get_acl,6,Proto::GetACLResponse) do | response |
        blk.call( response.acl )
    end
end

#mkpath(path, acl = ZK::ACL_OPEN_UNSAFE, &callback) ⇒ Object

Recursive make path

This will send parallel creates for ALL nodes up to the root and then ignore any NODE_EXISTS errors.

You generally want to call this after receiving a NO_NODE error from a simple #create

Parameters:

  • path (String)
  • acl (Data::ACL) (defaults to: ZK::ACL_OPEN_UNSAFE)

    the access control list to apply to any created nodes

Returns:

Raises:



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/zkruby/util.rb', line 17

def mkpath(path,acl=ZK::ACL_OPEN_UNSAFE,&callback)

    return synchronous_call(:mkpath,path,acl) unless block_given?

    connection_lost = false

    path_comp = path.split("/")

    # we only care about the result of the final create op
    last_op = nil

    (1-path_comp.length..-1).each do |i|

        sub_path = path_comp[0..i].join("/")

        op = create(sub_path,"",acl) { if i == -1 then callback.call() else :true end }

        op.errback do |err|
            if i == -1
                if ZK::Error::NODE_EXISTS === err
                    callback.call()
                elsif ZK::Error::CONNECTION_LOST === err || ( ZK::Error::NO_NODE && connection_lost )
                    # try again
                    mkpath(path,acl)
                    callback.call()
                else 
                    raise err 
                end
            elsif ZK::Error::CONNECTION_LOST === err
                connection_lost = true
                :connection_lost
            else
                # we don't care about any other errors, but we will log them
                logger.warn { "Error creating #{sub_path}, #{err}" }
            end
        end
        last_op = op if (i == -1)
       end

    return WrappedOp.new(last_op)
end

#multi(ops, &callback) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

See #transaction



459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
# File 'lib/zkruby/client.rb', line 459

def multi(ops,&callback)
    return synchronous_call(:multi,ops) unless block_given?

    req = Proto::MultiRequest.new()

    ops.each do |op|
        req.requests << { :header => { :_type => op.opcode, :done => false, :err=> 0 }, :request => op.request }
    end

    req.requests << { :header => { :_type => -1 , :done => true, :err => -1 } }

    logger.debug("Multi #{req}")
    queue_request(req,:multi,14,Proto::MultiResponse) do |response|
        exception = nil  
        response.responses.each_with_index() do |multi_response,index|
            next if multi_response.done?
            op = ops[index]
            if multi_response.header._type == -1
                errcode = multi_response.header.err.to_i
                if (errcode != 0)
                    exception = Error.lookup(errcode).exception("Transaction error for op ##{index} - #{op.op} (#{op.path})")
                    #TODO just raises the first exception
                    raise exception
                end
            else
                callback_args = if multi_response.has_response? then [ multi_response.response ] else [] end
                ops[index].callback.call(*callback_args)
            end
        end
    end
end

#rmpath(path, version) ⇒ Object #rmpath(path, version) { ... } ⇒ AsyncOp

Recursive delete

Although this method itself can be called synchronously all the zk activity is asynchronous, ie all subnodes are removed in parallel

Will retry on connection loss, or if some other activity is adding nodes in parallel. If you get a session expiry in the middle of this you will end up with a partially completed recursive delete. In all other circumstances it will eventually complete.

Overloads:

  • #rmpath(path, version) ⇒ Object

    Parameters:

    • path (String)

      this node and all its children will be recursively deleted

    • version (FixNum)

      the expected version to be deleted (-1 to match any version). Only applies to the top level path

    Returns:

    Raises:

  • #rmpath(path, version) { ... } ⇒ AsyncOp

    Yields:

    • callback invoked if delete is successful

    Returns:



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/zkruby/util.rb', line 78

def rmpath(path,version = -1, &callback)

    return synchronous_call(:rmpath,path,version) unless block_given?

    del_op = delete(path,version) { callback.call() }

    del_op.errback do |err|
        # we don't leave this method unless we get an exception
        # or have completed and called the callback
        case err
        when ZK::Error::NO_NODE
        when ZK::Error::CONNECTION_LOST
            rmpath(path,version)
        when ZK::Error::NOT_EMPTY

            stat, child_list = children(path)

            unless child_list.empty?
                child_ops = {}
                child_list.each do |child| 
                    child_path = "#{path}/#{child}"

                    rm_op = rmpath(child_path,-1) { :success }

                    rm_op.errback { |err| child_results[child_path] = err }

                    child_ops[child_path] = rm_op
                end

                # Wait until all our children are done (or error'd)
                child_ops.each { |child_path,op| op.value }
                rmpath(path,version)
            end
        else
            raise err
        end
        callback.call()
    end

    return WrappedOp.new(del_op)
end

#set(path, data, version) ⇒ Data::Stat #set(path, data, version) {|stat| ... } ⇒ AsyncOp

Set Data

Overloads:

  • #set(path, data, version) ⇒ Data::Stat

    Returns new stat of path (ie new version).

    Parameters:

    • path (String)
    • data (String)

      content to set at path

    • version (Fixnum)

      expected current version at path

    Returns:

    • (Data::Stat)

      new stat of path (ie new version)

    Raises:

  • #set(path, data, version) {|stat| ... } ⇒ AsyncOp

    Returns asynchronous operation.

    Yield Parameters:

    Returns:

    • (AsyncOp)

      asynchronous operation



386
387
388
# File 'lib/zkruby/client.rb', line 386

def set(path,data,version,&callback)
    op_set(path,data,version,&callback)
end

#set_acl(path, acl, version) ⇒ Object #set_acl(path, acl, version) {|new| ... } ⇒ AsyncOp

Set ACL

Overloads:

  • #set_acl(path, acl, version) ⇒ Object

    Returns Data::Stat new stat for path if successful.

    Parameters:

    • path (String)
    • acl (Array<Data::ACL>)

      list of acls for path

    • version (Fixnum)

      expected current version

    Returns:

    • Data::Stat new stat for path if successful

    Raises:

  • #set_acl(path, acl, version) {|new| ... } ⇒ AsyncOp

    Returns asynchronous operation.

    Yield Parameters:

    • new (Data::Stat)

      stat for path if successful

    Returns:

    • (AsyncOp)

      asynchronous operation



418
419
420
421
422
423
424
425
426
427
# File 'lib/zkruby/client.rb', line 418

def set_acl(path,acl,version,&blk)
    return synchronous_call(:set_acl,acl,version)[0] unless block_given?
    path = chroot(path) 

    req = Proto::SetACLRequest.new(:path => path, :acl => acl, :version => version)
    queue_request(req,:set_acl,7,Proto::SetACLResponse) do | response |
        blk.call( response.stat )
    end

end

#sync(path) ⇒ String #sync(path) {|path| ... } ⇒ AsyncOp

Synchronise path between session and leader

Overloads:

  • #sync(path) ⇒ String

    Returns path.

    Parameters:

    • path (String)

    Returns:

    • (String)

      path

    Raises:

  • #sync(path) {|path| ... } ⇒ AsyncOp

    Returns asynchronous operation.

    Yield Parameters:

    • path (String)

    Returns:

    • (AsyncOp)

      asynchronous operation



437
438
439
440
441
442
443
444
# File 'lib/zkruby/client.rb', line 437

def sync(path,&blk)
    return synchronous_call(:sync,path)[0] unless block_given?
    path = chroot(path) 
    req = Proto::SyncRequest.new(:path => path)
    queue_request(req,:sync,9,Proto::SyncResponse) do | response |
        blk.call(unchroot(response.path))
    end
end

#timeoutObject

Session timeout, initially as supplied, but once connected is the negotiated timeout with the server.



269
270
271
# File 'lib/zkruby/client.rb', line 269

def timeout
    @binding.session.timeout
end

#transactionTransaction #transaction {|txn| ... } ⇒ Object

Perform multiple operations in a transaction

Overloads:

  • #transactionTransaction

    Returns:

  • #transaction {|txn| ... } ⇒ Object

    Execute the supplied block and commit the transaction (synchronously)

    Yield Parameters:

Yields:

  • (txn)


497
498
499
500
501
502
503
# File 'lib/zkruby/client.rb', line 497

def transaction(&block)
    txn = Transaction.new(self,session)
    return txn unless block_given?

    yield txn
    txn.commit
end

#watcherObject

The currently registered default watcher



274
275
276
# File 'lib/zkruby/client.rb', line 274

def watcher 
    @binding.session.watcher
end

#watcher=(watcher) ⇒ Object

Assign the watcher to the session. This watcher will receive session connect/disconnect/expired events as well as any path based watches registered to the API calls using the literal value “true”

Parameters:



281
282
283
# File 'lib/zkruby/client.rb', line 281

def watcher=(watcher)
    @binding.session.watcher=watcher
end