Class: Zookeeper::Continuation

Inherits:
Object
  • Object
show all
Includes:
Constants, Logger
Defined in:
lib/zookeeper/continuation.rb

Overview

sigh, slightly different than the userland callbacks, the continuation provides sync call semantics around an async api

Defined Under Namespace

Classes: Registry

Constant Summary collapse

CALLBACK_ARG_IDX =

sigh what is the index in the *args array of the ‘callback’ param

{
  :get => 2,
  :set => 3,
  :exists => 2,
  :create => 3,
  :delete => 3,
  :get_acl => 2,
  :set_acl => 3,
  :get_children => 2,
  :state => 0,
}
METH_TO_ASYNC_RESULT_KEYS =

maps the method name to the async return hash keys it should use to deliver the results

{
  :get          => [:rc, :data, :stat],
  :set          => [:rc, :stat],
  :exists       => [:rc, :stat],
  :create       => [:rc, :string],
  :delete       => [:rc],
  :get_acl      => [:rc, :acl, :stat],
  :set_acl      => [:rc],
  :get_children => [:rc, :strings, :stat],
}

Constants included from Constants

Zookeeper::Constants::CONNECTED_EVENT_VALUES, Zookeeper::Constants::EVENT_TYPE_NAMES, Zookeeper::Constants::STATE_NAMES, Zookeeper::Constants::ZAPIERROR, Zookeeper::Constants::ZAUTHFAILED, Zookeeper::Constants::ZBADARGUMENTS, Zookeeper::Constants::ZBADVERSION, Zookeeper::Constants::ZCLOSING, Zookeeper::Constants::ZCONNECTIONLOSS, Zookeeper::Constants::ZDATAINCONSISTENCY, Zookeeper::Constants::ZINVALIDACL, Zookeeper::Constants::ZINVALIDCALLBACK, Zookeeper::Constants::ZINVALIDSTATE, Zookeeper::Constants::ZKRB_ASYNC_CONTN_ID, Zookeeper::Constants::ZKRB_GLOBAL_CB_REQ, Zookeeper::Constants::ZMARSHALLINGERROR, Zookeeper::Constants::ZNOAUTH, Zookeeper::Constants::ZNOCHILDRENFOREPHEMERALS, Zookeeper::Constants::ZNODEEXISTS, Zookeeper::Constants::ZNONODE, Zookeeper::Constants::ZNOTEMPTY, Zookeeper::Constants::ZNOTHING, Zookeeper::Constants::ZOK, Zookeeper::Constants::ZOO_ASSOCIATING_STATE, Zookeeper::Constants::ZOO_AUTH_FAILED_STATE, Zookeeper::Constants::ZOO_CHANGED_EVENT, Zookeeper::Constants::ZOO_CHILD_EVENT, Zookeeper::Constants::ZOO_CLOSED_STATE, Zookeeper::Constants::ZOO_CONNECTED_STATE, Zookeeper::Constants::ZOO_CONNECTING_STATE, Zookeeper::Constants::ZOO_CREATED_EVENT, Zookeeper::Constants::ZOO_DELETED_EVENT, Zookeeper::Constants::ZOO_EPHEMERAL, Zookeeper::Constants::ZOO_EXPIRED_SESSION_STATE, Zookeeper::Constants::ZOO_LOG_LEVEL_DEBUG, Zookeeper::Constants::ZOO_LOG_LEVEL_ERROR, Zookeeper::Constants::ZOO_LOG_LEVEL_INFO, Zookeeper::Constants::ZOO_LOG_LEVEL_WARN, Zookeeper::Constants::ZOO_NOTWATCHING_EVENT, Zookeeper::Constants::ZOO_SEQUENCE, Zookeeper::Constants::ZOO_SESSION_EVENT, Zookeeper::Constants::ZOPERATIONTIMEOUT, Zookeeper::Constants::ZRUNTIMEINCONSISTENCY, Zookeeper::Constants::ZSESSIONEXPIRED, Zookeeper::Constants::ZSESSIONMOVED, Zookeeper::Constants::ZSYSTEMERROR, Zookeeper::Constants::ZUNIMPLEMENTED

Constants included from ACLs::Constants

ACLs::Constants::ZOO_ANYONE_ID_UNSAFE, ACLs::Constants::ZOO_AUTH_IDS, ACLs::Constants::ZOO_CREATOR_ALL_ACL, ACLs::Constants::ZOO_OPEN_ACL_UNSAFE, ACLs::Constants::ZOO_PERM_ADMIN, ACLs::Constants::ZOO_PERM_ALL, ACLs::Constants::ZOO_PERM_CREATE, ACLs::Constants::ZOO_PERM_DELETE, ACLs::Constants::ZOO_PERM_READ, ACLs::Constants::ZOO_PERM_WRITE, ACLs::Constants::ZOO_READ_ACL_UNSAFE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, set_default

Methods included from Constants

#event_by_value, #state_by_value

Constructor Details

#initialize(meth, *args) ⇒ Continuation

Returns a new instance of Continuation.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/zookeeper/continuation.rb', line 79

def initialize(meth, *args)
  @meth   = meth
  @args   = args
  @mutex  = Mutex.new
  @cond   = ConditionVariable.new
  @rval   = nil

  # make this error reporting more robust if necessary, right now, just set to state
  @error  = nil
  
  # set to true when an event occurs that would cause the caller to
  # otherwise block forever
  @interrupt = false
end

Instance Attribute Details

#blockObject

Returns the value of attribute block.



77
78
79
# File 'lib/zookeeper/continuation.rb', line 77

def block
  @block
end

#methObject

Returns the value of attribute meth.



77
78
79
# File 'lib/zookeeper/continuation.rb', line 77

def meth
  @meth
end

#rvalObject

Returns the value of attribute rval.



77
78
79
# File 'lib/zookeeper/continuation.rb', line 77

def rval
  @rval
end

Instance Method Details

#call(hash) ⇒ Object

receive the response from the server, set @rval, notify caller



118
119
120
121
122
123
# File 'lib/zookeeper/continuation.rb', line 118

def call(hash)
  logger.debug { "continuation req_id #{req_id}, got hash: #{hash.inspect}" }
  @rval = hash.values_at(*METH_TO_ASYNC_RESULT_KEYS.fetch(meth))
  logger.debug { "delivering result #{@rval.inspect}" }
  deliver!
end

#req_idObject



157
158
159
# File 'lib/zookeeper/continuation.rb', line 157

def req_id
  @args.first
end

#state_call?Boolean

Returns:

  • (Boolean)


161
162
163
# File 'lib/zookeeper/continuation.rb', line 161

def state_call?
  @meth == :state
end

#submit(czk) ⇒ Object

this method is called by the event thread to submit the request passed the CZookeeper instance, makes the async call and deals with the results

BTW: in case you were wondering this is a completely stupid implementation, but it’s more important to get something working and passing specs, then refactor to make everything sane



137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/zookeeper/continuation.rb', line 137

def submit(czk)
  state = czk.zkrb_state    # check the state of the connection

  if @meth == :state        # if the method is a state call
    @rval = [state]         # we're done, no error
    return deliver!

  elsif state != ZOO_CONNECTED_STATE  # otherwise, we must be connected
    @error = state                    # so set the error
    return deliver!                   # and we're out
  end

  rc, *_ = czk.__send__(:"zkrb_#{@meth}", *async_args)
  
  if user_callback? or (rc != ZOK)  # async call, or we failed to submit it
    @rval = [rc]                    # create the repsonse
    deliver!                        # wake the caller and we're out
  end
end

#user_callback?Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/zookeeper/continuation.rb', line 125

def user_callback?
  !!@args.at(callback_arg_idx)
end

#valueObject

the caller calls this method and receives the response from the async loop



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/zookeeper/continuation.rb', line 95

def value
  @mutex.synchronize do
    @cond.wait(@mutex) until @rval or @error

    case @error
    when nil
      # ok, nothing to see here, carry on
    when ZOO_EXPIRED_SESSION_STATE
      raise Exceptions::SessionExpired, "connection has expired"
    else
      raise Exceptions::NotConnected, "connection state is #{STATE_NAMES[@error]}"
    end

    case @rval.length
    when 1
      return @rval.first
    else
      return @rval
    end
  end
end