Module: Fraggle::Client

Includes:
Logger, Protocol
Included in:
Test::TestClient
Defined in:
lib/fraggle/client.rb

Defined Under Namespace

Classes: Error

Constant Summary collapse

MinTag =
0
MaxTag =
(1<<32)
Nibbles =
"0123456789abcdef"

Constants included from Logger

Logger::DEBUG, Logger::ERROR, Logger::INFO

Instance Attribute Summary

Attributes included from Logger

#level, #writer

Attributes included from Protocol

#last_received

Instance Method Summary collapse

Methods included from Logger

#debug, #error, #info, #log

Methods included from Protocol

#receive_data, #send_data, #send_request

Instance Method Details

#__cancel__(what, &blk) ⇒ Object

Be careful with this. It is recommended you use #cancel on the Request returned to ensure you don’t run into a race-condition where you cancel an operation you may have thought was something else.



177
178
179
180
181
182
183
184
185
186
187
188
189
190
# File 'lib/fraggle/client.rb', line 177

def __cancel__(what, &blk)
  req = Request.new
  req.verb = Request::Verb::CANCEL
  req.id = what.tag

  # Hold on to the tag as unavaiable for reuse until the cancel succeeds.
  @cbx[what.tag] = nil

  send(req) do |res|
    # Do not send any more responses from the server to this request.
    @cbx.delete(what.tag)
    blk.call(res) if blk
  end
end

#cancelable(req) ⇒ Object



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/fraggle/client.rb', line 225

def cancelable(req)
  c   = self
  can = true

  req.metadef :cancel do
    if can
      can = false
      c.__cancel__(self)
    end
  end

  req.metadef :canceled? do
    !can
  end

  req
end

#casify(cas) ⇒ Object



327
328
329
330
331
332
333
# File 'lib/fraggle/client.rb', line 327

def casify(cas)
  case cas
  when :missing then Response::Missing
  when :clobber then Response::Clobber
  else cas
  end
end

#checkin(path, cas, &blk) ⇒ Object



58
59
60
61
62
63
64
65
# File 'lib/fraggle/client.rb', line 58

def checkin(path, cas, &blk)
  req = Request.new
  req.verb  = Request::Verb::CHECKIN
  req.path  = path
  req.cas   = casify(cas)

  send(req, &blk)
end

#del(path, cas, &blk) ⇒ Object



126
127
128
129
130
131
132
133
# File 'lib/fraggle/client.rb', line 126

def del(path, cas, &blk)
  req = Request.new
  req.verb  = Request::Verb::DEL
  req.path  = path
  req.cas   = casify(cas)

  send(req, &blk)
end

#delsnap(sid, &blk) ⇒ Object



159
160
161
162
163
164
165
# File 'lib/fraggle/client.rb', line 159

def delsnap(sid, &blk)
  req = Request.new
  req.verb = Request::Verb::DELSNAP
  req.id = sid

  send(req, &blk)
end

#genkey(prefix = nil) ⇒ Object



335
336
337
338
# File 'lib/fraggle/client.rb', line 335

def genkey(prefix=nil)
  postfix = (0...16).map { Nibbles[rand(Nibbles.length)].chr }.join
  prefix ? prefix+"."+postfix : postfix
end

#get(sid, path, &blk) ⇒ Object



87
88
89
90
91
92
93
94
# File 'lib/fraggle/client.rb', line 87

def get(sid, path, &blk)
  req = Request.new
  req.verb = Request::Verb::GET
  req.id   = sid if sid != 0 # wire optimization
  req.path = path

  send(req, &blk)
end

#getdir(sid, path, offset, limit, &blk) ⇒ Object



105
106
107
108
109
110
111
112
113
114
# File 'lib/fraggle/client.rb', line 105

def getdir(sid, path, offset, limit, &blk)
  req = Request.new
  req.verb   = Request::Verb::GETDIR
  req.id     = sid    if sid != 0
  req.offset = offset if offset != 0
  req.limit  = limit  if limit  != 0
  req.path   = path

  send(req, &blk)
end

#initialize(addrs) ⇒ Object



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/fraggle/client.rb', line 23

def initialize(addrs)
  @addr  = addrs.shift
  @init  = addrs
  @addrs = {}
  @shun  = {}
  @cbx   = {}

  # Logging
  @level   = ERROR
  @writer  = $stderr
end

#noop(&blk) ⇒ Object



167
168
169
170
171
172
# File 'lib/fraggle/client.rb', line 167

def noop(&blk)
  req = Request.new
  req.verb = Request::Verb::NOOP

  send(req, &blk)
end

#post_initObject



243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/fraggle/client.rb', line 243

def post_init
  info "successfully connected to #{@addr}"

  @last_received = Time.now

  EM.add_periodic_timer(2) do
    if (n = Time.now - last_received) >= 3
      error("timeout talking to #{@addr}")
      close_connection
    else
      debug("ping")
      get(0, "/ping") { debug("pong") }
    end
  end

  waw = Proc.new do |e|
    if e.value != ""
      addr = @addrs.delete(e.value)
      if addr
        error "noticed #{addr} is gone; removing"
      end
    else
      get 0, "/doozer/info/#{e.value}/public-addr" do |a|
        addr = a.value.to_s
        if @shun.has_key?(addr)
          if (n = Time.now - @shun[addr]) > 3
            info "pardoning #{addr} after #{n} secs"
            @shun.delete(addr)
          else
            info "ignoring shunned addr #{addr}"
            next
          end
        end
        # TODO: Be defensive and check the addr value is valid
        @addrs[e.path] = addr
        info("added #{e.path} addr #{addr}")
      end
    end
  end

  watch("/doozer/slot/*", &waw)

  w = walk(0, "/doozer/slot/*", &waw)
  w.done do
    # We have the best known addrs;  We can clear the initial
    # ones given at inception.
    debug "addrs list complete; clearing init addrs"
    @init.clear
  end
end

#receive_response(res) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/fraggle/client.rb', line 35

def receive_response(res)
  debug "received response: #{res.inspect}"

  if res.err_code
    if req = @cbx.delete(res.tag)
      req.emit(:error, res)
      return
    end
  end

  if (res.flags & Response::Flag::VALID) > 0
    if req = @cbx[res.tag]
      req.emit(:valid, res)
    end
  end

  if (res.flags & Response::Flag::DONE) > 0
    if req = @cbx.delete(res.tag)
      req.emit(:done)
    end
  end
end

#send(req, &blk) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# File 'lib/fraggle/client.rb', line 192

def send(req, &blk)
  tag = MinTag

  while @cbx.has_key?(tag)
    tag += 1
    if tag > MaxTag
      tag = MinTag
    end
  end

  req.tag = tag

  if blk
    req.valid(&blk)
  end

  # Setup a default error handler that gives useful information
  req.error do |e|
    raise Error.new("'error (%d) (%s)' for: %s" % [
      e.err_code,
      e.err_detail.inspect,
      req.inspect
    ])
  end

  @cbx[req.tag] = req

  debug "sending request:   #{req.inspect}"
  send_request(req)

  req
end

#session(prefix = nil, &blk) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/fraggle/client.rb', line 67

def session(prefix=nil, &blk)
  name    = genkey(prefix)
  estab   = false

  f = Proc.new do |e|
    # If this is the first response from the server, it's go-time.
    if ! estab
      blk.call
    end

    # We've successfully established a session.  Say so.
    estab = true

    # Get back to the server ASAP
    checkin(name, e.cas, &f)
  end

  checkin(name, 0, &f)
end

#set(path, value, cas, &blk) ⇒ Object



116
117
118
119
120
121
122
123
124
# File 'lib/fraggle/client.rb', line 116

def set(path, value, cas, &blk)
  req = Request.new
  req.verb  = Request::Verb::SET
  req.path  = path
  req.value = value
  req.cas   = casify(cas)

  send(req, &blk)
end

#snap(&blk) ⇒ Object



152
153
154
155
156
157
# File 'lib/fraggle/client.rb', line 152

def snap(&blk)
  req = Request.new
  req.verb = Request::Verb::SNAP

  send(req, &blk)
end

#stat(sid, path, &blk) ⇒ Object



96
97
98
99
100
101
102
103
# File 'lib/fraggle/client.rb', line 96

def stat(sid, path, &blk)
  req = Request.new
  req.verb = Request::Verb::STAT
  req.id   = sid if sid != 0 # wire optimization
  req.path = path

  send(req, &blk)
end

#unbindObject

What happens when a connection is closed for any reason.



295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
# File 'lib/fraggle/client.rb', line 295

def unbind
  error "disconnected from #{@addr}"

  # Shun the address we were currently attempting/connected to.
  @shun[@addr] = Time.now
  @addrs.delete_if {|_, v| v == @addr }

  # We don't want the timer to race us while
  # we're trying to reconnect.  Once the reconnect
  # has been complete, we'll start the timer again.
  EM.cancel_timer(@timer)

  # Attempt to use an addr given to us by a Doozer
  _, @addr = @addrs.shift

  if ! @addr
    # As a last resort, try one of the addresses given
    # at inception.
    @addr = @init.shift
  end

  if ! @addr
    # We are all out of addresses to try
    raise "No more doozers!"
  end

  host, port = @addr.split(":")
  info "attempting reconnect to #{host}:#{port}"
  reconnect(host, port.to_i)
  post_init
end

#walk(sid, glob, &blk) ⇒ Object



135
136
137
138
139
140
141
142
# File 'lib/fraggle/client.rb', line 135

def walk(sid, glob, &blk)
  req = Request.new
  req.verb = Request::Verb::WALK
  req.id   = sid if sid != 0 # wire optimization
  req.path = glob

  cancelable(send(req, &blk))
end

#watch(glob, &blk) ⇒ Object



144
145
146
147
148
149
150
# File 'lib/fraggle/client.rb', line 144

def watch(glob, &blk)
  req = Request.new
  req.verb = Request::Verb::WATCH
  req.path = glob

  cancelable(send(req, &blk))
end