Class: Zookeeper::Continuation
- Inherits:
-
Object
- Object
- Zookeeper::Continuation
- Defined in:
- lib/zookeeper/continuation.rb
Overview
sigh, slightly different than the userland callbacks, the continuation provides sync call semantics around an async api
Defined Under Namespace
Classes: Registry
Constant Summary collapse
- OPERATION_TIMEOUT =
seconds
30
- CALLBACK_ARG_IDX =
sigh what is the index in the *args array of the ‘callback’ param
{ :get => 2, :set => 3, :exists => 2, :create => 3, :delete => 3, :get_acl => 2, :set_acl => 3, :get_children => 2, :state => 0, }
- METH_TO_ASYNC_RESULT_KEYS =
maps the method name to the async return hash keys it should use to deliver the results
{ :get => [:rc, :data, :stat], :set => [:rc, :stat], :exists => [:rc, :stat], :create => [:rc, :string], :delete => [:rc], :get_acl => [:rc, :acl, :stat], :set_acl => [:rc], :get_children => [:rc, :strings, :stat], }
Constants included from Constants
Zookeeper::Constants::CONNECTED_EVENT_VALUES, Zookeeper::Constants::EVENT_TYPE_NAMES, Zookeeper::Constants::STATE_NAMES, Zookeeper::Constants::ZAPIERROR, Zookeeper::Constants::ZAUTHFAILED, Zookeeper::Constants::ZBADARGUMENTS, Zookeeper::Constants::ZBADVERSION, Zookeeper::Constants::ZCLOSING, Zookeeper::Constants::ZCONNECTIONLOSS, Zookeeper::Constants::ZDATAINCONSISTENCY, Zookeeper::Constants::ZINVALIDACL, Zookeeper::Constants::ZINVALIDCALLBACK, Zookeeper::Constants::ZINVALIDSTATE, Zookeeper::Constants::ZKRB_ASYNC_CONTN_ID, Zookeeper::Constants::ZKRB_GLOBAL_CB_REQ, Zookeeper::Constants::ZMARSHALLINGERROR, Zookeeper::Constants::ZNOAUTH, Zookeeper::Constants::ZNOCHILDRENFOREPHEMERALS, Zookeeper::Constants::ZNODEEXISTS, Zookeeper::Constants::ZNONODE, Zookeeper::Constants::ZNOTEMPTY, Zookeeper::Constants::ZNOTHING, Zookeeper::Constants::ZOK, Zookeeper::Constants::ZOO_ASSOCIATING_STATE, Zookeeper::Constants::ZOO_AUTH_FAILED_STATE, Zookeeper::Constants::ZOO_CHANGED_EVENT, Zookeeper::Constants::ZOO_CHILD_EVENT, Zookeeper::Constants::ZOO_CLOSED_STATE, Zookeeper::Constants::ZOO_CONNECTED_STATE, Zookeeper::Constants::ZOO_CONNECTING_STATE, Zookeeper::Constants::ZOO_CREATED_EVENT, Zookeeper::Constants::ZOO_DELETED_EVENT, Zookeeper::Constants::ZOO_EPHEMERAL, Zookeeper::Constants::ZOO_EXPIRED_SESSION_STATE, Zookeeper::Constants::ZOO_LOG_LEVEL_DEBUG, Zookeeper::Constants::ZOO_LOG_LEVEL_ERROR, Zookeeper::Constants::ZOO_LOG_LEVEL_INFO, Zookeeper::Constants::ZOO_LOG_LEVEL_WARN, Zookeeper::Constants::ZOO_NOTWATCHING_EVENT, Zookeeper::Constants::ZOO_SEQUENCE, Zookeeper::Constants::ZOO_SESSION_EVENT, Zookeeper::Constants::ZOPERATIONTIMEOUT, Zookeeper::Constants::ZRUNTIMEINCONSISTENCY, Zookeeper::Constants::ZSESSIONEXPIRED, Zookeeper::Constants::ZSESSIONMOVED, Zookeeper::Constants::ZSYSTEMERROR, Zookeeper::Constants::ZUNIMPLEMENTED
Constants included from ACLs::Constants
ACLs::Constants::ZOO_ANYONE_ID_UNSAFE, ACLs::Constants::ZOO_AUTH_IDS, ACLs::Constants::ZOO_CREATOR_ALL_ACL, ACLs::Constants::ZOO_OPEN_ACL_UNSAFE, ACLs::Constants::ZOO_PERM_ADMIN, ACLs::Constants::ZOO_PERM_ALL, ACLs::Constants::ZOO_PERM_CREATE, ACLs::Constants::ZOO_PERM_DELETE, ACLs::Constants::ZOO_PERM_READ, ACLs::Constants::ZOO_PERM_WRITE, ACLs::Constants::ZOO_READ_ACL_UNSAFE
Instance Attribute Summary collapse
-
#args ⇒ Object
readonly
Returns the value of attribute args.
-
#block ⇒ Object
Returns the value of attribute block.
-
#meth ⇒ Object
Returns the value of attribute meth.
-
#rval ⇒ Object
Returns the value of attribute rval.
Instance Method Summary collapse
-
#call(hash) ⇒ Object
receive the response from the server, set @rval, notify caller.
-
#initialize(meth, *args) ⇒ Continuation
constructor
A new instance of Continuation.
- #req_id ⇒ Object
-
#shutdown! ⇒ Object
interrupt the sleeping thread with a NotConnected error.
- #state_call? ⇒ Boolean
-
#submit(czk) ⇒ Object
this method is called by the event thread to submit the request passed the CZookeeper instance, makes the async call and deals with the results.
- #user_callback? ⇒ Boolean
-
#value ⇒ Object
the caller calls this method and receives the response from the async loop this method has a hard-coded 30 second timeout as a safety feature.
Methods included from Logger
Methods included from Constants
#event_by_value, #state_by_value
Constructor Details
#initialize(meth, *args) ⇒ Continuation
Returns a new instance of Continuation.
83 84 85 86 87 88 89 90 91 92 |
# File 'lib/zookeeper/continuation.rb', line 83 def initialize(meth, *args) @meth = meth @args = args.freeze @mutex = Monitor.new @cond = @mutex.new_cond @rval = nil # make this error reporting more robust if necessary, right now, just set to state @error = nil end |
Instance Attribute Details
#args ⇒ Object (readonly)
Returns the value of attribute args.
81 82 83 |
# File 'lib/zookeeper/continuation.rb', line 81 def args @args end |
#block ⇒ Object
Returns the value of attribute block.
79 80 81 |
# File 'lib/zookeeper/continuation.rb', line 79 def block @block end |
#meth ⇒ Object
Returns the value of attribute meth.
79 80 81 |
# File 'lib/zookeeper/continuation.rb', line 79 def meth @meth end |
#rval ⇒ Object
Returns the value of attribute rval.
79 80 81 |
# File 'lib/zookeeper/continuation.rb', line 79 def rval @rval end |
Instance Method Details
#call(hash) ⇒ Object
receive the response from the server, set @rval, notify caller
139 140 141 142 143 144 |
# File 'lib/zookeeper/continuation.rb', line 139 def call(hash) logger.debug { "continuation req_id #{req_id}, got hash: #{hash.inspect}" } @rval = hash.values_at(*METH_TO_ASYNC_RESULT_KEYS.fetch(meth)) logger.debug { "delivering result #{@rval.inspect}" } deliver! end |
#req_id ⇒ Object
178 179 180 |
# File 'lib/zookeeper/continuation.rb', line 178 def req_id @args.first end |
#shutdown! ⇒ Object
interrupt the sleeping thread with a NotConnected error
187 188 189 190 191 192 193 |
# File 'lib/zookeeper/continuation.rb', line 187 def shutdown! @mutex.synchronize do return if @rval or @error @error = :shutdown @cond.broadcast end end |
#state_call? ⇒ Boolean
182 183 184 |
# File 'lib/zookeeper/continuation.rb', line 182 def state_call? @meth == :state end |
#submit(czk) ⇒ Object
this method is called by the event thread to submit the request passed the CZookeeper instance, makes the async call and deals with the results
BTW: in case you were wondering this is a completely stupid implementation, but it’s more important to get something working and passing specs, then refactor to make everything sane
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/zookeeper/continuation.rb', line 158 def submit(czk) state = czk.zkrb_state # check the state of the connection if @meth == :state # if the method is a state call @rval = [state] # we're done, no error return deliver! elsif state != ZOO_CONNECTED_STATE # otherwise, we must be connected @error = state # so set the error return deliver! # and we're out end rc, *_ = czk.__send__(:"zkrb_#{@meth}", *async_args) if user_callback? or (rc != ZOK) # async call, or we failed to submit it @rval = [rc] # create the repsonse deliver! # wake the caller and we're out end end |
#user_callback? ⇒ Boolean
146 147 148 |
# File 'lib/zookeeper/continuation.rb', line 146 def user_callback? !!@args.at(callback_arg_idx) end |
#value ⇒ Object
the caller calls this method and receives the response from the async loop this method has a hard-coded 30 second timeout as a safety feature. No call should take more than 20s (as the session timeout is set to 20s) so if any call takes longer than that, something has gone horribly wrong.
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
# File 'lib/zookeeper/continuation.rb', line 101 def value time_to_stop = Time.now + OPERATION_TIMEOUT now = nil @mutex.synchronize do while true now = Time.now break if @rval or @error or (now > time_to_stop) deadline = time_to_stop.to_f - now.to_f @cond.wait(deadline) end if (now > time_to_stop) and !@rval and !@error raise Exceptions::ContinuationTimeoutError, "response for meth: #{meth.inspect}, args: #{@args.inspect}, not received within #{OPERATION_TIMEOUT} seconds" end case @error when nil # ok, nothing to see here, carry on when :shutdown raise Exceptions::NotConnected, "the connection is shutting down" when ZOO_EXPIRED_SESSION_STATE raise Exceptions::SessionExpired, "connection has expired" else raise Exceptions::NotConnected, "connection state is #{STATE_NAMES[@error]}" end case @rval.length when 1 return @rval.first else return @rval end end end |