Class: ZK::NodeDeletionWatcher
- Inherits:
-
Object
- Object
- ZK::NodeDeletionWatcher
- Includes:
- Exceptions, Logging, Zookeeper::Constants
- Defined in:
- lib/zk/node_deletion_watcher.rb
Instance Attribute Summary collapse
-
#path ⇒ Object
readonly
Returns the value of attribute path.
-
#zk ⇒ Object
readonly
Returns the value of attribute zk.
Instance Method Summary collapse
- #block_until_deleted ⇒ Object
- #blocked? ⇒ Boolean
- #done? ⇒ Boolean
-
#initialize(zk, path) ⇒ NodeDeletionWatcher
constructor
A new instance of NodeDeletionWatcher.
-
#interrupt! ⇒ Object
cause a thread blocked us to be awakened and have a WakeUpException raised.
-
#wait_until_blocked(timeout = nil) ⇒ true?
this is for testing, allows us to wait until this object has gone into blocking state.
Methods included from Logging
Constructor Details
#initialize(zk, path) ⇒ NodeDeletionWatcher
Returns a new instance of NodeDeletionWatcher.
18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/zk/node_deletion_watcher.rb', line 18 def initialize(zk, path) @zk = zk @path = path.dup @subs = [] @mutex = Monitor.new # ffs, 1.8.7 compatibility w/ timeouts @cond = @mutex.new_cond @blocked = NOT_YET @result = nil end |
Instance Attribute Details
#path ⇒ Object (readonly)
Returns the value of attribute path.
16 17 18 |
# File 'lib/zk/node_deletion_watcher.rb', line 16 def path @path end |
#zk ⇒ Object (readonly)
Returns the value of attribute zk.
16 17 18 |
# File 'lib/zk/node_deletion_watcher.rb', line 16 def zk @zk end |
Instance Method Details
#block_until_deleted ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/zk/node_deletion_watcher.rb', line 89 def block_until_deleted @mutex.synchronize do raise InvalidStateError, "Already fired for #{path}" if @result register_callbacks unless zk.exists?(path, :watch => true) # we are done, these are one-shot, so write the results @result = :deleted @blocked = NOT_ANYMORE @cond.broadcast # wake any waiting threads return true end logger.debug { "ok, going to block: #{path}" } @blocked = BLOCKED @cond.broadcast # wake threads waiting for @blocked to change @cond.wait_until { @result } # wait until we get a result @blocked = NOT_ANYMORE case @result when :deleted logger.debug { "path #{path} was deleted" } return true when INTERRUPTED raise ZK::Exceptions::WakeUpException when ZOO_EXPIRED_SESSION_STATE raise Zookeeper::Exceptions::SessionExpired when ZOO_CONNECTING_STATE raise Zookeeper::Exceptions::NotConnected when ZOO_CLOSED_STATE raise Zookeeper::Exceptions::ConnectionClosed else raise "Hit unexpected case in block_until_node_deleted, result was: #{@result.inspect}" end end ensure unregister_callbacks end |
#blocked? ⇒ Boolean
35 36 37 |
# File 'lib/zk/node_deletion_watcher.rb', line 35 def blocked? @mutex.synchronize { @blocked == BLOCKED } end |
#done? ⇒ Boolean
31 32 33 |
# File 'lib/zk/node_deletion_watcher.rb', line 31 def done? @mutex.synchronize { !!@result } end |
#interrupt! ⇒ Object
cause a thread blocked us to be awakened and have a WakeUpException raised.
if a result has already been delivered, then this does nothing
if a result has not yet been delivered, any thread calling block_until_deleted will receive the exception immediately
77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/zk/node_deletion_watcher.rb', line 77 def interrupt! @mutex.synchronize do case @blocked when NOT_YET, BLOCKED @result = INTERRUPTED @cond.broadcast else return end end end |
#wait_until_blocked(timeout = nil) ⇒ true?
this is for testing, allows us to wait until this object has gone into blocking state.
avoids the race where if we have already been blocked and released this will not block the caller
pass optional timeout to return after that amount of time or nil to block forever
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/zk/node_deletion_watcher.rb', line 51 def wait_until_blocked(timeout=nil) @mutex.synchronize do return true unless @blocked == NOT_YET start = Time.now time_to_stop = timeout ? (start + timeout) : nil logger.debug { "#{__method__} @blocked: #{@blocked.inspect} about to wait" } @cond.wait(timeout) if (time_to_stop and (Time.now > time_to_stop)) and (@blocked == NOT_YET) return nil end (@blocked == NOT_YET) ? nil : true end end |