Class: ZK::Client::Threaded
- Includes:
- Conveniences, StateMixin, Unixisms, Logging
- Defined in:
- lib/zk/client/threaded.rb
Overview
This is the default client that ZK will use. In the zk-eventmachine gem, there is an Evented client.
If you want to register on_*
callbacks (see ZK::Client::StateMixin)
then you should pass a block, which will be called before the
connection is set up (this way you can get the on_connected
event). See
the 'Register on_connected callback' example.
A note on event delivery. There has been some confusion, caused by incorrect documentation (which I'm very sorry about), about how many threads are delivering events. The documentation for 0.9.0 was incorrect in stating the number of threads used to deliver events. There was one, unconfigurable, event dispatch thread. In 1.0 the number of event delivery threads is configurable, but still defaults to 1.
If you use the threadpool/event callbacks to perform work, you may be
interested in registering an on_exception
callback that will receive
all exceptions that occur on the threadpool that are not handled (i.e.
that bubble up to top of a block).
Constant Summary collapse
- DEFAULT_THREADPOOL_SIZE =
1
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#close! ⇒ Object
close the underlying connection and clear all pending events.
-
#initialize(host, opts = {}) {|self| ... } ⇒ Threaded
constructor
Construct a new threaded client.
-
#on_exception(&blk) ⇒ Object
register a block to be called back with unhandled exceptions that occur in the threadpool.
-
#on_threadpool? ⇒ Boolean
returns true if the current thread is one of the threadpool threads.
-
#reopen(timeout = nil) ⇒ Symbol
reopen the underlying connection.
Methods included from Conveniences
#defer, #election_candidate, #election_observer, #locker, #queue, #shared_locker, #with_lock
Methods included from Unixisms
#block_until_node_deleted, #find, #mkdir_p, #rm_rf
Methods included from StateMixin
#associating?, #connected?, #connecting?, #expired_session?, #on_connected, #on_connecting, #on_expired_session, #state, #wrap_state_closed_error
Methods inherited from Base
#children, #closed?, #create, #delete, #event_dispatch_thread?, #exists?, #get, #get_acl, #register, #safe_session_id, #session_id, #session_passwd, #set, #set_acl, #stat, #watcher
Constructor Details
#initialize(host, opts = {}) {|self| ... } ⇒ Threaded
The :timeout
argument here is not the session_timeout for the
connection. rather it is the amount of time we wait for the connection
to be established. The session timeout exchanged with the server is
set to 10s by default in the C implemenation, and as of version 0.8.0
of slyphon-zookeeper has yet to be exposed as an option. That feature
is planned.
The documentation for 0.9.0 was incorrect in stating the number
of threads used to deliver events. There was one, unconfigurable,
event dispatch thread. In 1.0 the number of event delivery threads is
configurable, but still defaults to 1 and users are discouraged from
adjusting the value due to the complexity this introduces. In 1.1
there is a better option for achieving higher concurrency (see the
:thread
option)
The Management apologizes for any confusion this may have caused.
Construct a new threaded client.
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/zk/client/threaded.rb', line 116 def initialize(host, opts={}, &b) super(host, opts) tp_size = opts.fetch(:threadpool_size, DEFAULT_THREADPOOL_SIZE) @threadpool = Threadpool.new(tp_size) @session_timeout = opts.fetch(:timeout, DEFAULT_TIMEOUT) # maybe move this into superclass? @event_handler = EventHandler.new(self, opts) @reconnect = opts.fetch(:reconnect, true) @mutex = Mutex.new @close_requested = false yield self if block_given? @cnx = create_connection(host, @session_timeout, @event_handler.get_default_watcher_block) end |
Instance Method Details
#close! ⇒ Object
We will make our best effort to do the right thing if you call this method while in the threadpool. It is a much better idea to call us from the main thread, or at least a thread we're not going to be trying to shut down as part of closing the connection and threadpool.
close the underlying connection and clear all pending events.
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/zk/client/threaded.rb', line 150 def close! @mutex.synchronize do return if @close_requested @close_requested = true end on_tpool = on_threadpool? # Ok, so the threadpool will wait up to N seconds while joining each thread. # If _we're on a threadpool thread_, have it wait until we're ready to jump # out of this method, and tell it to wait up to 5 seconds to let us get # clear, then do the rest of the shutdown of the connection # # if the user *doesn't* hate us, then we just join the shutdown_thread immediately # and wait for it to exit # shutdown_thread = Thread.new do @threadpool.shutdown(2) super end shutdown_thread.join unless on_tpool nil end |
#on_exception(&blk) ⇒ Object
if your exception callback block itself raises an exception, I will make fun of you.
register a block to be called back with unhandled exceptions that occur in the threadpool.
182 183 184 |
# File 'lib/zk/client/threaded.rb', line 182 def on_exception(&blk) @threadpool.on_exception(&blk) end |
#on_threadpool? ⇒ Boolean
returns true if the current thread is one of the threadpool threads
177 178 179 |
# File 'lib/zk/client/threaded.rb', line 177 def on_threadpool? @threadpool and @threadpool.on_threadpool? end |
#reopen(timeout = nil) ⇒ Symbol
reopen the underlying connection
The timeout
param is here mainly for legacy support.
137 138 139 140 |
# File 'lib/zk/client/threaded.rb', line 137 def reopen(timeout=nil) @mutex.synchronize { @close_requested = false } super end |