Class: ZK::ZKEventMachine::Client

Inherits:
Client::Base
  • Object
show all
Includes:
Deferred::Accessors, Logging, Unixisms
Defined in:
lib/z_k/z_k_event_machine/client.rb

Constant Summary collapse

DEFAULT_TIMEOUT =
10

Instance Method Summary collapse

Methods included from Unixisms

#mkdir_p, #rm_rf

Constructor Details

#initialize(host, opts = {}) ⇒ Client

Takes same options as ZK::Client::Base



55
56
57
58
59
60
# File 'lib/z_k/z_k_event_machine/client.rb', line 55

def initialize(host, opts={})
  @host = host
  @event_handler  = EventHandlerEM.new(self)
  @closing        = false
  register_default_event_handlers!
end

Instance Method Details

#children(path, opts = {}, &block) ⇒ Object



159
160
161
162
163
164
165
# File 'lib/z_k/z_k_event_machine/client.rb', line 159

def children(path, opts={}, &block)
  Callback.new_children_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :opts => opts }
    super(path, opts.merge(:callback => cb))
  end
end

#close!(&blk) ⇒ Object Also known as: close



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/z_k/z_k_event_machine/client.rb', line 83

def close!(&blk)
  on_close(&blk)
  return on_close if @closing
  @closing = true

  if @cnx
    logger.debug { "#{self.class.name}: in close! clearing event_handler" }
    event_handler.clear!

    logger.debug { "#{self.class.name}: calling @cnx.close" }
    @cnx.close do
      logger.debug { "firing on_close handler" }
      on_close.succeed
      @cnx = nil
    end
  else
    on_close.succeed
  end

  on_close
end

#connect(&blk) ⇒ Object

open a ZK connection, attach it to the reactor. returns an EM::Deferrable that will be called when the connection is ready for use



70
71
72
73
74
75
76
# File 'lib/z_k/z_k_event_machine/client.rb', line 70

def connect(&blk)
  # XXX: maybe move this into initialize, need to figure out how to schedule it properly
  @cnx ||= (
    ZookeeperEM::Client.new(@host, DEFAULT_TIMEOUT, event_handler.get_default_watcher_block)
  )
  @cnx.on_attached(&blk)
end

#create(path, data = '', opts = {}, &block) ⇒ Object



119
120
121
122
123
124
125
# File 'lib/z_k/z_k_event_machine/client.rb', line 119

def create(path, data='', opts={}, &block)
  Callback.new_create_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :data => data, :opts => opts }
    super(path, data, opts.merge(:callback => cb))
  end
end

#delete(path, opts = {}, &block) ⇒ Object



151
152
153
154
155
156
157
# File 'lib/z_k/z_k_event_machine/client.rb', line 151

def delete(path, opts={}, &block)
  Callback.new_delete_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :opts => opts }
    super(path, opts.merge(:callback => cb))
  end
end

#exists?(path, opts = {}, &block) ⇒ Boolean

Returns:

  • (Boolean)


147
148
149
# File 'lib/z_k/z_k_event_machine/client.rb', line 147

def exists?(path, opts={}, &block)
  stat(path, opts.merge(:cb_style => 'exists'), &block)
end

#get(path, opts = {}, &block) ⇒ Callback

get data at path, optionally enabling a watch on the node

Returns:

  • (Callback)

    returns a Callback which is an EM::Deferred (so you can assign callbacks/errbacks) see Callback::Base for discussion



111
112
113
114
115
116
117
# File 'lib/z_k/z_k_event_machine/client.rb', line 111

def get(path, opts={}, &block)
  Callback.new_get_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :opts => opts }
    super(path, opts.merge(:callback => cb))
  end
end

#get_acl(path, opts = {}, &block) ⇒ Object



167
168
169
170
171
172
173
# File 'lib/z_k/z_k_event_machine/client.rb', line 167

def get_acl(path, opts={}, &block)
  Callback.new_get_acl_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :opts => opts }
    super(path, opts.merge(:callback => cb))
  end
end

#on_closeDeferred::Default

called back once the connection has been closed.

Returns:

  • (Deferred::Default)


52
# File 'lib/z_k/z_k_event_machine/client.rb', line 52

deferred_event :close

#on_connectedDeferred::Default

Note:

this is experimental currently. This may or may not fire for the initial connection.

Registers a one-shot callback for the ZOO_CONNECTED_STATE event.

it’s purpose is to warn an already-existing client with watches that a connection has been re-established (with session information saved). From the ZooKeeper Programmers’ Guide:

If you are using watches, you must look for the connected watch event.
When a ZooKeeper client disconnects from a server, you will not receive
notification of changes until reconnected. If you are watching for a
znode to come into existance, you will miss the event if the znode is
created and deleted while you are disconnected.

once this deferred has been fired, it will be replaced with a new deferred, so callbacks must be re-registered, and should be re-registered within the callback to avoid missing events

Returns:

  • (Deferred::Default)


40
# File 'lib/z_k/z_k_event_machine/client.rb', line 40

deferred_event :connected

#on_connection_lostDeferred::Default

If we get a ZK::Exceptions::ConnectionLoss exeption back from any call, or a EXPIRED_SESSION_STATE event, we will call back any handlers registered here with the exception instance as the argument.

once this deferred has been fired, it will be replaced with a new deferred, so callbacks must be re-registered, and should be re-registered within the callback to avoid missing events

Returns:

  • (Deferred::Default)


20
# File 'lib/z_k/z_k_event_machine/client.rb', line 20

deferred_event :connection_lost

#session_idFixnum

Returns The underlying connection’s session_id.

Returns:

  • (Fixnum)

    The underlying connection’s session_id



184
185
186
187
# File 'lib/z_k/z_k_event_machine/client.rb', line 184

def session_id
  return nil unless @cnx
  @cnx.session_id
end

#session_passwdString

Returns The underlying connection’s session passwd (an opaque value).

Returns:

  • (String)

    The underlying connection’s session passwd (an opaque value)



190
191
192
193
# File 'lib/z_k/z_k_event_machine/client.rb', line 190

def session_passwd
  return nil unless @cnx
  @cnx.session_passwd
end

#set(path, data, opts = {}, &block) ⇒ Object



127
128
129
130
131
132
133
# File 'lib/z_k/z_k_event_machine/client.rb', line 127

def set(path, data, opts={}, &block)
  Callback.new_set_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :data => data, :opts => opts }
    super(path, data, opts.merge(:callback => cb))
  end
end

#set_acl(path, acls, opts = {}, &block) ⇒ Object



175
176
177
178
179
180
181
# File 'lib/z_k/z_k_event_machine/client.rb', line 175

def set_acl(path, acls, opts={}, &block)
  Callback.new_set_acl_cb(block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :acls => acls, :opts => opts }
    super(path, acls, opts.merge(:callback => cb))
  end
end

#stat(path, opts = {}, &block) ⇒ Object



135
136
137
138
139
140
141
142
143
144
145
# File 'lib/z_k/z_k_event_machine/client.rb', line 135

def stat(path, opts={}, &block)
  cb_style = opts.delete(:cb_style) { |_| 'stat' }

  meth = :"new_#{cb_style}_cb"

  Callback.__send__(meth, block) do |cb|
    cb.errback(&method(:connection_lost_hook))
    cb.context = { :method => __method__, :path => path, :meth => meth, :opts => opts }
    super(path, opts.merge(:callback => cb))
  end
end