Class: ZK::Client::Threaded

Inherits:
Base
  • Object
show all
Includes:
Conveniences, StateMixin, Unixisms, Logging
Defined in:
lib/zk/client/threaded.rb

Overview

This is the default client that ZK will use. In the zk-eventmachine gem, there is an Evented client.

If you want to register on_* callbacks (see ZK::Client::StateMixin) then you should pass a block, which will be called before the connection is set up (this way you can get the on_connected event). See the 'Register on_connected callback' example.

A note on event delivery. There has been some confusion, caused by incorrect documentation (which I'm very sorry about), about how many threads are delivering events. The documentation for 0.9.0 was incorrect in stating the number of threads used to deliver events. There was one, unconfigurable, event dispatch thread. In 1.0 the number of event delivery threads is configurable, but still defaults to 1.

If you use the threadpool/event callbacks to perform work, you may be interested in registering an on_exception callback that will receive all exceptions that occur on the threadpool that are not handled (i.e. that bubble up to top of a block).

Examples:

Register on_connected callback.


# the nice thing about this pattern is that in the case of a call to #reopen
# all your watches will be re-established

ZK::Client::Threaded.new('localhsot:2181') do |zk|
  # do not do anything in here except register callbacks

  zk.on_connected do |event|
    zk.stat('/foo/bar', watch: true)
    zk.stat('/baz', watch: true)
  end
end

Constant Summary collapse

DEFAULT_THREADPOOL_SIZE =
1

Instance Attribute Summary

Attributes inherited from Base

#event_handler

Instance Method Summary collapse

Methods included from Conveniences

#defer, #election_candidate, #election_observer, #locker, #queue, #shared_locker, #with_lock

Methods included from Unixisms

#block_until_node_deleted, #find, #mkdir_p, #rm_rf

Methods included from StateMixin

#associating?, #connected?, #connecting?, #expired_session?, #on_connected, #on_connecting, #on_expired_session, #state, #wrap_state_closed_error

Methods inherited from Base

#children, #closed?, #create, #delete, #event_dispatch_thread?, #exists?, #get, #get_acl, #register, #safe_session_id, #session_id, #session_passwd, #set, #set_acl, #stat, #watcher

Constructor Details

#initialize(host, opts = {}) {|self| ... } ⇒ Threaded

Note:

The :timeout argument here is not the session_timeout for the connection. rather it is the amount of time we wait for the connection to be established. The session timeout exchanged with the server is set to 10s by default in the C implemenation, and as of version 0.8.0 of slyphon-zookeeper has yet to be exposed as an option. That feature is planned.

Note:

The documentation for 0.9.0 was incorrect in stating the number of threads used to deliver events. There was one, unconfigurable, event dispatch thread. In 1.0 the number of event delivery threads is configurable, but still defaults to 1 and users are discouraged from adjusting the value due to the complexity this introduces. In 1.1 there is a better option for achieving higher concurrency (see the :thread option)

The Management apologizes for any confusion this may have caused.

Construct a new threaded client.

Parameters:

  • host (String)

    (see ZK::Client::Base#initialize)

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :reconnect (true, false) — default: true

    if true, we will register the equivalent of on_session_expired { zk.reopen } so that in the case of an expired session, we will keep trying to reestablish the connection.

  • :thread (:single, :per_callback) — default: :single

    choose your event delivery model:

    • :single: There is one thread, and only one callback is called at a time. This is the default mode (for now), and will provide the most safety for your app. All events will be delivered as received, to callbacks in the order they were registered. This safety has the tradeoff that if one of your callbacks performs some action that blocks the delivery thread, you will not recieve other events until it returns. You're also limiting the concurrency of your app. This should be fine for most simple apps, and is a good choice to start with when developing your application

    • :per_callback: This option will use a new-style Actor model (inspired by Celluloid) that uses a per-callback queue and thread to allow for greater concurrency in your app, whille still maintaining some kind of sanity. By choosing this option your callbacks will receive events in order, and will receive only one at a time, but in parallel with other callbacks. This model has the advantage you can have all of your callbacks making progress in parallel, and if one of them happens to block, it will not affect the others.

    • see the wiki for a discussion and demonstration of the effect of this setting.

  • :timeout (Fixnum)

    how long we will wait for the connection to be established. If timeout is nil, we will wait forever: use carefully.

Yields:

  • (self)

    calls the block with the new instance after the event handler and threadpool have been set up, but before any connections have been made. This allows the client to register watchers for session events like connected. You cannot perform any other operations with the client as you will get a NoMethodError (the underlying connection is nil).

See Also:

Since:

  • 1.1: Instead of adjusting the threadpool, users are strongly encouraged to use the :thread => :per_callback option to increase the parallelism of event delivery safely and sanely. Please see this wiki article for more information and a demonstration.



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/zk/client/threaded.rb', line 116

def initialize(host, opts={}, &b)
  super(host, opts)

  tp_size = opts.fetch(:threadpool_size, DEFAULT_THREADPOOL_SIZE)
  @threadpool = Threadpool.new(tp_size)

  @session_timeout = opts.fetch(:timeout, DEFAULT_TIMEOUT) # maybe move this into superclass?
  @event_handler   = EventHandler.new(self, opts)

  @reconnect = opts.fetch(:reconnect, true)

  @mutex = Mutex.new

  @close_requested = false

  yield self if block_given?

  @cnx = create_connection(host, @session_timeout, @event_handler.get_default_watcher_block)
end

Instance Method Details

#close!Object

Note:

We will make our best effort to do the right thing if you call this method while in the threadpool. It is a much better idea to call us from the main thread, or at least a thread we're not going to be trying to shut down as part of closing the connection and threadpool.

close the underlying connection and clear all pending events.



150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/zk/client/threaded.rb', line 150

def close!
  @mutex.synchronize do 
    return if @close_requested
    @close_requested = true 
  end

  on_tpool = on_threadpool?

  # Ok, so the threadpool will wait up to N seconds while joining each thread.
  # If _we're on a threadpool thread_, have it wait until we're ready to jump
  # out of this method, and tell it to wait up to 5 seconds to let us get
  # clear, then do the rest of the shutdown of the connection 
  #
  # if the user *doesn't* hate us, then we just join the shutdown_thread immediately
  # and wait for it to exit
  #
  shutdown_thread = Thread.new do
    @threadpool.shutdown(2)
    super
  end

  shutdown_thread.join unless on_tpool

  nil
end

#on_exception(&blk) ⇒ Object

Note:

if your exception callback block itself raises an exception, I will make fun of you.

register a block to be called back with unhandled exceptions that occur in the threadpool.



182
183
184
# File 'lib/zk/client/threaded.rb', line 182

def on_exception(&blk)
  @threadpool.on_exception(&blk)
end

#on_threadpool?Boolean

returns true if the current thread is one of the threadpool threads

Returns:

  • (Boolean)


177
178
179
# File 'lib/zk/client/threaded.rb', line 177

def on_threadpool?
  @threadpool and @threadpool.on_threadpool?
end

#reopen(timeout = nil) ⇒ Symbol

reopen the underlying connection

The timeout param is here mainly for legacy support.

Parameters:

  • timeout (Numeric) (defaults to: nil)

    how long should we wait for the connection to reach a connected state before returning. Note that the method will not raise and will return whether the connection reaches the 'connected' state or not. The default is actually to use the same value that was passed to the constructor for 'timeout'

Returns:

  • (Symbol)

    state of connection after operation



137
138
139
140
# File 'lib/zk/client/threaded.rb', line 137

def reopen(timeout=nil)
  @mutex.synchronize { @close_requested = false }
  super
end