Class: Beanstalk::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/beanstalk-client/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(addrs, default_tube = nil) ⇒ Pool

Returns a new instance of Pool.



248
249
250
251
252
253
254
# File 'lib/beanstalk-client/connection.rb', line 248

def initialize(addrs, default_tube=nil)
  @addrs = addrs
  @watch_list = ['default']
  @default_tube=default_tube
  @watch_list = [default_tube] if default_tube
  connect()
end

Instance Attribute Details

#last_connObject

Returns the value of attribute last_conn.



246
247
248
# File 'lib/beanstalk-client/connection.rb', line 246

def last_conn
  @last_conn
end

Instance Method Details

#closeObject

Close all open connections for this pool



365
366
367
368
369
370
371
372
# File 'lib/beanstalk-client/connection.rb', line 365

def close
  while @connections.size > 0
    addr = @connections.keys.last
    conn = @connections[addr]
    @connections.delete(addr)
    conn.close
  end
end

#connectObject



256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/beanstalk-client/connection.rb', line 256

def connect()
  @connections ||= {}
  @addrs.each do |addr|
    begin
      if !@connections.include?(addr)
        @connections[addr] = Connection.new(addr, @default_tube)
        prev_watched = @connections[addr].list_tubes_watched()
        to_ignore = prev_watched - @watch_list
        @watch_list.each{|tube| @connections[addr].watch(tube)}
        to_ignore.each{|tube| @connections[addr].ignore(tube)}
      end
    rescue Errno::ECONNREFUSED
      raise NotConnected
    rescue Exception => ex
      puts "#{ex.class}: #{ex}"
    end
  end
  @connections.size
end

#ignore(tube) ⇒ Object



324
325
326
327
328
# File 'lib/beanstalk-client/connection.rb', line 324

def ignore(tube)
  r = send_to_all_conns(:ignore, tube)
  @watch_list = send_to_rand_conn(:list_tubes_watched, true)
  return r
end

#last_serverObject



280
281
282
# File 'lib/beanstalk-client/connection.rb', line 280

def last_server
  @last_conn.addr
end

#list_tube_usedObject



350
351
352
# File 'lib/beanstalk-client/connection.rb', line 350

def list_tube_used()
  send_to_all_conns(:list_tube_used)
end

#list_tubesObject



346
347
348
# File 'lib/beanstalk-client/connection.rb', line 346

def list_tubes()
  send_to_all_conns(:list_tubes)
end

#list_tubes_watched(*args) ⇒ Object



354
355
356
# File 'lib/beanstalk-client/connection.rb', line 354

def list_tubes_watched(*args)
  send_to_all_conns(:list_tubes_watched, *args)
end

#on_tube(tube, &block) ⇒ Object



301
302
303
# File 'lib/beanstalk-client/connection.rb', line 301

def on_tube(tube, &block)
  send_to_rand_conn(:on_tube, tube, &block)
end

#open_connectionsObject



276
277
278
# File 'lib/beanstalk-client/connection.rb', line 276

def open_connections()
  @connections.values()
end

#peek_buriedObject



382
383
384
# File 'lib/beanstalk-client/connection.rb', line 382

def peek_buried()
  send_to_each_conn_first_res(:peek_buried)
end

#peek_delayedObject



378
379
380
# File 'lib/beanstalk-client/connection.rb', line 378

def peek_delayed()
  send_to_each_conn_first_res(:peek_delayed)
end

#peek_job(id) ⇒ Object



386
387
388
# File 'lib/beanstalk-client/connection.rb', line 386

def peek_job(id)
  make_hash(send_to_all_conns(:peek_job, id))
end

#peek_readyObject



374
375
376
# File 'lib/beanstalk-client/connection.rb', line 374

def peek_ready()
  send_to_each_conn_first_res(:peek_ready)
end

#put(body, pri = 65536, delay = 0, ttr = 120) ⇒ Object

Put a job on the queue.

Parameters:

  • body: the payload of the job (use Beanstalk::Pool#yput / Beanstalk::Job#ybody to automatically serialize your payload with YAML)

  • pri: priority. Default 65536 (higher numbers are higher priority)

  • delay: how long to wait until making the job available for reservation

  • ttr: time in seconds for the job to reappear on the queue (if beanstalk doesn’t hear from a consumer within this time, assume the consumer died and make the job available for someone else to process). Default 120 seconds.



292
293
294
# File 'lib/beanstalk-client/connection.rb', line 292

def put(body, pri=65536, delay=0, ttr=120)
  send_to_rand_conn(:put, body, pri, delay, ttr)
end

#raw_statsObject



330
331
332
# File 'lib/beanstalk-client/connection.rb', line 330

def raw_stats()
  send_to_all_conns(:stats)
end

#raw_stats_tube(tube) ⇒ Object



338
339
340
# File 'lib/beanstalk-client/connection.rb', line 338

def raw_stats_tube(tube)
  send_to_all_conns(:stats_tube, tube)
end

#remove(conn) ⇒ Object



358
359
360
361
362
# File 'lib/beanstalk-client/connection.rb', line 358

def remove(conn)
  connection = @connections.delete(conn.addr)
  connection.close if connection
  connection
end

#reserve(timeout = nil) ⇒ Object

Reserve a job from the queue.

Parameters

  • timeout - Time (in seconds) to wait for a job to be available. If nil, wait indefinitely.



310
311
312
# File 'lib/beanstalk-client/connection.rb', line 310

def reserve(timeout=nil)
  send_to_rand_conn(:reserve, timeout)
end

#statsObject



334
335
336
# File 'lib/beanstalk-client/connection.rb', line 334

def stats()
  sum_hashes(raw_stats.values)
end

#stats_tube(tube) ⇒ Object



342
343
344
# File 'lib/beanstalk-client/connection.rb', line 342

def stats_tube(tube)
  sum_hashes(raw_stats_tube(tube).values)
end

#use(tube) ⇒ Object



314
315
316
# File 'lib/beanstalk-client/connection.rb', line 314

def use(tube)
  send_to_all_conns(:use, tube)
end

#watch(tube) ⇒ Object



318
319
320
321
322
# File 'lib/beanstalk-client/connection.rb', line 318

def watch(tube)
  r = send_to_all_conns(:watch, tube)
  @watch_list = send_to_rand_conn(:list_tubes_watched, true)
  return r
end

#yput(obj, pri = 65536, delay = 0, ttr = 120) ⇒ Object

Like put, but serialize the object with YAML.



297
298
299
# File 'lib/beanstalk-client/connection.rb', line 297

def yput(obj, pri=65536, delay=0, ttr=120)
  send_to_rand_conn(:yput, obj, pri, delay, ttr)
end