Class: Zookeeper::Continuation

Inherits:
Object
  • Object
show all
Includes:
Constants, Logger
Defined in:
lib/zookeeper/continuation.rb

Overview

sigh, slightly different than the userland callbacks, the continuation provides sync call semantics around an async api

Defined Under Namespace

Classes: Registry

Constant Summary collapse

OPERATION_TIMEOUT =

seconds

30
CALLBACK_ARG_IDX =

sigh what is the index in the *args array of the ‘callback’ param

{
  :get => 2,
  :set => 3,
  :exists => 2,
  :create => 3,
  :delete => 3,
  :get_acl => 2,
  :set_acl => 3,
  :get_children => 2,
  :state => 0,
}
METH_TO_ASYNC_RESULT_KEYS =

maps the method name to the async return hash keys it should use to deliver the results

{
  :get          => [:rc, :data, :stat],
  :set          => [:rc, :stat],
  :exists       => [:rc, :stat],
  :create       => [:rc, :string],
  :delete       => [:rc],
  :get_acl      => [:rc, :acl, :stat],
  :set_acl      => [:rc],
  :get_children => [:rc, :strings, :stat],
}

Constants included from Constants

Zookeeper::Constants::CONNECTED_EVENT_VALUES, Zookeeper::Constants::EVENT_TYPE_NAMES, Zookeeper::Constants::STATE_NAMES, Zookeeper::Constants::ZAPIERROR, Zookeeper::Constants::ZAUTHFAILED, Zookeeper::Constants::ZBADARGUMENTS, Zookeeper::Constants::ZBADVERSION, Zookeeper::Constants::ZCLOSING, Zookeeper::Constants::ZCONNECTIONLOSS, Zookeeper::Constants::ZDATAINCONSISTENCY, Zookeeper::Constants::ZINVALIDACL, Zookeeper::Constants::ZINVALIDCALLBACK, Zookeeper::Constants::ZINVALIDSTATE, Zookeeper::Constants::ZKRB_ASYNC_CONTN_ID, Zookeeper::Constants::ZKRB_GLOBAL_CB_REQ, Zookeeper::Constants::ZMARSHALLINGERROR, Zookeeper::Constants::ZNOAUTH, Zookeeper::Constants::ZNOCHILDRENFOREPHEMERALS, Zookeeper::Constants::ZNODEEXISTS, Zookeeper::Constants::ZNONODE, Zookeeper::Constants::ZNOTEMPTY, Zookeeper::Constants::ZNOTHING, Zookeeper::Constants::ZOK, Zookeeper::Constants::ZOO_ASSOCIATING_STATE, Zookeeper::Constants::ZOO_AUTH_FAILED_STATE, Zookeeper::Constants::ZOO_CHANGED_EVENT, Zookeeper::Constants::ZOO_CHILD_EVENT, Zookeeper::Constants::ZOO_CLOSED_STATE, Zookeeper::Constants::ZOO_CONNECTED_STATE, Zookeeper::Constants::ZOO_CONNECTING_STATE, Zookeeper::Constants::ZOO_CREATED_EVENT, Zookeeper::Constants::ZOO_DELETED_EVENT, Zookeeper::Constants::ZOO_EPHEMERAL, Zookeeper::Constants::ZOO_EXPIRED_SESSION_STATE, Zookeeper::Constants::ZOO_LOG_LEVEL_DEBUG, Zookeeper::Constants::ZOO_LOG_LEVEL_ERROR, Zookeeper::Constants::ZOO_LOG_LEVEL_INFO, Zookeeper::Constants::ZOO_LOG_LEVEL_WARN, Zookeeper::Constants::ZOO_NOTWATCHING_EVENT, Zookeeper::Constants::ZOO_SEQUENCE, Zookeeper::Constants::ZOO_SESSION_EVENT, Zookeeper::Constants::ZOPERATIONTIMEOUT, Zookeeper::Constants::ZRUNTIMEINCONSISTENCY, Zookeeper::Constants::ZSESSIONEXPIRED, Zookeeper::Constants::ZSESSIONMOVED, Zookeeper::Constants::ZSYSTEMERROR, Zookeeper::Constants::ZUNIMPLEMENTED

Constants included from ACLs::Constants

ACLs::Constants::ZOO_ANYONE_ID_UNSAFE, ACLs::Constants::ZOO_AUTH_IDS, ACLs::Constants::ZOO_CREATOR_ALL_ACL, ACLs::Constants::ZOO_OPEN_ACL_UNSAFE, ACLs::Constants::ZOO_PERM_ADMIN, ACLs::Constants::ZOO_PERM_ALL, ACLs::Constants::ZOO_PERM_CREATE, ACLs::Constants::ZOO_PERM_DELETE, ACLs::Constants::ZOO_PERM_READ, ACLs::Constants::ZOO_PERM_WRITE, ACLs::Constants::ZOO_READ_ACL_UNSAFE

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, set_default

Methods included from Constants

#event_by_value, #state_by_value

Constructor Details

#initialize(meth, *args) ⇒ Continuation

Returns a new instance of Continuation.



83
84
85
86
87
88
89
90
91
92
# File 'lib/zookeeper/continuation.rb', line 83

def initialize(meth, *args)
  @meth   = meth
  @args   = args.freeze
  @mutex  = Monitor.new
  @cond   = @mutex.new_cond
  @rval   = nil

  # make this error reporting more robust if necessary, right now, just set to state
  @error  = nil
end

Instance Attribute Details

#argsObject (readonly)

Returns the value of attribute args.



81
82
83
# File 'lib/zookeeper/continuation.rb', line 81

def args
  @args
end

#blockObject

Returns the value of attribute block.



79
80
81
# File 'lib/zookeeper/continuation.rb', line 79

def block
  @block
end

#methObject

Returns the value of attribute meth.



79
80
81
# File 'lib/zookeeper/continuation.rb', line 79

def meth
  @meth
end

#rvalObject

Returns the value of attribute rval.



79
80
81
# File 'lib/zookeeper/continuation.rb', line 79

def rval
  @rval
end

Instance Method Details

#call(hash) ⇒ Object

receive the response from the server, set @rval, notify caller



139
140
141
142
143
144
# File 'lib/zookeeper/continuation.rb', line 139

def call(hash)
  logger.debug { "continuation req_id #{req_id}, got hash: #{hash.inspect}" }
  @rval = hash.values_at(*METH_TO_ASYNC_RESULT_KEYS.fetch(meth))
  logger.debug { "delivering result #{@rval.inspect}" }
  deliver!
end

#req_idObject



178
179
180
# File 'lib/zookeeper/continuation.rb', line 178

def req_id
  @args.first
end

#shutdown!Object

interrupt the sleeping thread with a NotConnected error



187
188
189
190
191
192
193
# File 'lib/zookeeper/continuation.rb', line 187

def shutdown!
  @mutex.synchronize do
    return if @rval or @error
    @error = :shutdown
    @cond.broadcast
  end
end

#state_call?Boolean

Returns:

  • (Boolean)


182
183
184
# File 'lib/zookeeper/continuation.rb', line 182

def state_call?
  @meth == :state
end

#submit(czk) ⇒ Object

this method is called by the event thread to submit the request passed the CZookeeper instance, makes the async call and deals with the results

BTW: in case you were wondering this is a completely stupid implementation, but it’s more important to get something working and passing specs, then refactor to make everything sane



158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/zookeeper/continuation.rb', line 158

def submit(czk)
  state = czk.zkrb_state    # check the state of the connection

  if @meth == :state        # if the method is a state call
    @rval = [state]         # we're done, no error
    return deliver!

  elsif state != ZOO_CONNECTED_STATE  # otherwise, we must be connected
    @error = state                    # so set the error
    return deliver!                   # and we're out
  end

  rc, *_ = czk.__send__(:"zkrb_#{@meth}", *async_args)
  
  if user_callback? or (rc != ZOK)  # async call, or we failed to submit it
    @rval = [rc]                    # create the repsonse
    deliver!                        # wake the caller and we're out
  end
end

#user_callback?Boolean

Returns:

  • (Boolean)


146
147
148
# File 'lib/zookeeper/continuation.rb', line 146

def user_callback?
  !!@args.at(callback_arg_idx)
end

#valueObject

the caller calls this method and receives the response from the async loop this method has a hard-coded 30 second timeout as a safety feature. No call should take more than 20s (as the session timeout is set to 20s) so if any call takes longer than that, something has gone horribly wrong.

Raises:

  • (ContinuationTimeoutError)

    if a response is not received within 30s



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# File 'lib/zookeeper/continuation.rb', line 101

def value
  time_to_stop = Time.now + OPERATION_TIMEOUT
  now = nil

  @mutex.synchronize do
    while true
      now = Time.now
      break if @rval or @error or (now > time_to_stop)

      deadline = time_to_stop.to_f - now.to_f
      @cond.wait(deadline)
    end

    if (now > time_to_stop) and !@rval and !@error
      raise Exceptions::ContinuationTimeoutError, "response for meth: #{meth.inspect}, args: #{@args.inspect}, not received within #{OPERATION_TIMEOUT} seconds"
    end

    case @error
    when nil
      # ok, nothing to see here, carry on
    when :shutdown
      raise Exceptions::NotConnected, "the connection is shutting down"
    when ZOO_EXPIRED_SESSION_STATE
      raise Exceptions::SessionExpired, "connection has expired"
    else
      raise Exceptions::NotConnected, "connection state is #{STATE_NAMES[@error]}"
    end

    case @rval.length
    when 1
      return @rval.first
    else
      return @rval
    end
  end
end