Class: Zookeeper::CZookeeper

Inherits:
Object
  • Object
show all
Includes:
Constants, Exceptions, Forked, Logger
Defined in:
ext/c_zookeeper.rb,
ext/zkrb.c

Overview

NOTE: this class extending (opening) the class defined in zkrb.c

Defined Under Namespace

Classes: ClientId, GotNilEventException

Constant Summary collapse

DEFAULT_SESSION_TIMEOUT_MSEC =
10000

Constants included from Exceptions

Exceptions::ExpiredSession

Constants included from Constants

Zookeeper::Constants::CONNECTED_EVENT_VALUES, Zookeeper::Constants::EVENT_TYPE_NAMES, Zookeeper::Constants::STATE_NAMES, Zookeeper::Constants::ZAPIERROR, Zookeeper::Constants::ZAUTHFAILED, Zookeeper::Constants::ZBADARGUMENTS, Zookeeper::Constants::ZBADVERSION, Zookeeper::Constants::ZCLOSING, Zookeeper::Constants::ZCONNECTIONLOSS, Zookeeper::Constants::ZDATAINCONSISTENCY, Zookeeper::Constants::ZINVALIDACL, Zookeeper::Constants::ZINVALIDCALLBACK, Zookeeper::Constants::ZINVALIDSTATE, Zookeeper::Constants::ZKRB_ASYNC_CONTN_ID, Zookeeper::Constants::ZKRB_GLOBAL_CB_REQ, Zookeeper::Constants::ZMARSHALLINGERROR, Zookeeper::Constants::ZNOAUTH, Zookeeper::Constants::ZNOCHILDRENFOREPHEMERALS, Zookeeper::Constants::ZNODEEXISTS, Zookeeper::Constants::ZNONODE, Zookeeper::Constants::ZNOTEMPTY, Zookeeper::Constants::ZNOTHING, Zookeeper::Constants::ZOK, Zookeeper::Constants::ZOO_ASSOCIATING_STATE, Zookeeper::Constants::ZOO_AUTH_FAILED_STATE, Zookeeper::Constants::ZOO_CHANGED_EVENT, Zookeeper::Constants::ZOO_CHILD_EVENT, Zookeeper::Constants::ZOO_CLOSED_STATE, Zookeeper::Constants::ZOO_CONNECTED_STATE, Zookeeper::Constants::ZOO_CONNECTING_STATE, Zookeeper::Constants::ZOO_CREATED_EVENT, Zookeeper::Constants::ZOO_DELETED_EVENT, Zookeeper::Constants::ZOO_EPHEMERAL, Zookeeper::Constants::ZOO_EXPIRED_SESSION_STATE, Zookeeper::Constants::ZOO_LOG_LEVEL_DEBUG, Zookeeper::Constants::ZOO_LOG_LEVEL_ERROR, Zookeeper::Constants::ZOO_LOG_LEVEL_INFO, Zookeeper::Constants::ZOO_LOG_LEVEL_WARN, Zookeeper::Constants::ZOO_NOTWATCHING_EVENT, Zookeeper::Constants::ZOO_SEQUENCE, Zookeeper::Constants::ZOO_SESSION_EVENT, Zookeeper::Constants::ZOPERATIONTIMEOUT, Zookeeper::Constants::ZRUNTIMEINCONSISTENCY, Zookeeper::Constants::ZSESSIONEXPIRED, Zookeeper::Constants::ZSESSIONMOVED, Zookeeper::Constants::ZSYSTEMERROR, Zookeeper::Constants::ZUNIMPLEMENTED

Constants included from ACLs::Constants

ACLs::Constants::ZOO_ANYONE_ID_UNSAFE, ACLs::Constants::ZOO_AUTH_IDS, ACLs::Constants::ZOO_CREATOR_ALL_ACL, ACLs::Constants::ZOO_OPEN_ACL_UNSAFE, ACLs::Constants::ZOO_PERM_ADMIN, ACLs::Constants::ZOO_PERM_ALL, ACLs::Constants::ZOO_PERM_CREATE, ACLs::Constants::ZOO_PERM_DELETE, ACLs::Constants::ZOO_PERM_READ, ACLs::Constants::ZOO_PERM_WRITE, ACLs::Constants::ZOO_READ_ACL_UNSAFE

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Logger

included, set_default

Methods included from Exceptions

by_code, raise_on_error

Methods included from Constants

#event_by_value, #state_by_value

Methods included from Forked

#forked?, #update_pid!

Constructor Details

#initialize(host, event_queue, opts = {}) ⇒ CZookeeper

Returns a new instance of CZookeeper.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'ext/c_zookeeper.rb', line 42

def initialize(host, event_queue, opts={})
  @host = host
  @event_queue = event_queue

  # keep track of the pid that created us
  update_pid!
  
  # used by the C layer. CZookeeper sets this to true when the init method
  # has completed. once this is set to true, it stays true.
  #
  # you should grab the @mutex before messing with this flag
  @_running = nil

  # This is set to true after destroy_zkrb_instance has been called and all
  # CZookeeper state has been cleaned up
  @_closed = false  # also used by the C layer

  # set by the ruby side to indicate we are in shutdown mode used by method_get_next_event
  @_shutting_down = false

  # the actual C data is stashed in this ivar. never *ever* touch this
  @_data = nil

  @_session_timeout_msec = DEFAULT_SESSION_TIMEOUT_MSEC

  @mutex = Monitor.new
  
  # used to signal that we're running
  @running_cond = @mutex.new_cond

  # used to signal we've received the connected event
  @state_cond = @mutex.new_cond

  # the current state of the connection
  @state = ZOO_CLOSED_STATE

  @pipe_read, @pipe_write = IO.pipe
  
  @event_thread = nil

  # hash of in-flight Continuation instances
  @reg = Continuation::Registry.new

  log_level = ENV['ZKC_DEBUG'] ? ZOO_LOG_LEVEL_DEBUG : ZOO_LOG_LEVEL_ERROR

  logger.info { "initiating connection to #{@host}" }

  zkrb_init(@host)#, :zkc_log_level => log_level)

  start_event_thread

  logger.debug { "init returned!" }
end

Instance Attribute Details

#original_pidObject

Returns the value of attribute original_pid.



21
22
23
# File 'ext/c_zookeeper.rb', line 21

def original_pid
  @original_pid
end

Class Method Details

.get_debug_levelObject

assume we’re at debug level



24
25
26
# File 'ext/c_zookeeper.rb', line 24

def self.get_debug_level
  @debug_level ||= ZOO_LOG_LEVEL_INFO
end

.set_debug_level(value) ⇒ Object



28
29
30
31
# File 'ext/c_zookeeper.rb', line 28

def self.set_debug_level(value)
  @debug_level = value
  set_zkrb_debug_level(value)
end

Instance Method Details

#associating?Boolean

Returns:

  • (Boolean)


116
117
118
# File 'ext/c_zookeeper.rb', line 116

def associating?
  state == ZOO_ASSOCIATING_STATE
end

#closeObject



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'ext/c_zookeeper.rb', line 120

def close
  return if closed?

  fn_close = proc do
    if !@_closed and @_data
      logger.debug { "CALLING CLOSE HANDLE!!" }
      close_handle
    end
  end

  if forked?
    fn_close.call
  else
    stop_event_thread
    @mutex.synchronize(&fn_close)
  end

  [@pipe_read, @pipe_write].each { |io| io.close unless io.closed? }

  nil
end

#closed?Boolean

Returns:

  • (Boolean)


96
97
98
# File 'ext/c_zookeeper.rb', line 96

def closed?
  @mutex.synchronize { !!@_closed }
end

#connected?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'ext/c_zookeeper.rb', line 108

def connected?
  state == ZOO_CONNECTED_STATE
end

#connecting?Boolean

Returns:

  • (Boolean)


112
113
114
# File 'ext/c_zookeeper.rb', line 112

def connecting?
  state == ZOO_CONNECTING_STATE
end

#pause_before_fork_in_parentObject

call this to stop the event loop, you can resume with the resume method

requests may still be added during this time, but they will not be processed until you call resume



147
148
149
150
# File 'ext/c_zookeeper.rb', line 147

def pause_before_fork_in_parent
  logger.debug { "##{__method__}" }
  @mutex.synchronize { stop_event_thread }
end

#resume_after_fork_in_parentObject

call this if ‘pause’ was previously called to start the event loop again



153
154
155
156
157
158
159
160
# File 'ext/c_zookeeper.rb', line 153

def resume_after_fork_in_parent
  logger.debug { "##{__method__}" }

  @mutex.synchronize do 
    @_shutting_down = nil
    start_event_thread
  end
end

#running?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'ext/c_zookeeper.rb', line 100

def running?
  @mutex.synchronize { !!@_running }
end

#shutting_down?Boolean

Returns:

  • (Boolean)


104
105
106
# File 'ext/c_zookeeper.rb', line 104

def shutting_down?
  @mutex.synchronize { !!@_shutting_down }
end

#stateObject



162
163
164
165
# File 'ext/c_zookeeper.rb', line 162

def state
  return ZOO_CLOSED_STATE if closed?
  @mutex.synchronize { @state }
end

#wait_until_connected(timeout = 10) ⇒ Object

this implementation is gross, but i don’t really see another way of doing it without more grossness

returns true if we’re connected, false if we’re not

if timeout is nil, we never time out, and wait forever for CONNECTED state



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
# File 'ext/c_zookeeper.rb', line 174

def wait_until_connected(timeout=10)
  time_to_stop = timeout ? Time.now + timeout : nil

  return false unless wait_until_running(timeout)

  @mutex.synchronize do
    while true 
      if timeout
        now = Time.now
        break if (@state == ZOO_CONNECTED_STATE) || @_shutting_down || @_closed || (now > time_to_stop)
        delay = time_to_stop.to_f - now.to_f
        @state_cond.wait(delay)
      else
        break if (@state == ZOO_CONNECTED_STATE) || @_shutting_down || @_closed
        @state_cond.wait
      end
    end
  end

  connected?
end