Class: Quebert::Backend::Beanstalk

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Logging
Defined in:
lib/quebert/backend/beanstalk.rb

Overview

Manage jobs on a Beanstalk queue out of process

Constant Summary collapse

TTR_BUFFER =

A buffer time in seconds added to the Beanstalk TTR for Quebert to do its own job cleanup The job will perform based on the Beanstalk TTR, but Beanstalk hangs on to the job just a little longer so that Quebert can bury the job or schedule a retry with the appropriate delay

5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, queue) ⇒ Beanstalk

Returns a new instance of Beanstalk.



20
21
22
23
24
# File 'lib/quebert/backend/beanstalk.rb', line 20

def initialize(host, queue)
  @host = host
  @queue = queue
  @queues = []
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



17
18
19
# File 'lib/quebert/backend/beanstalk.rb', line 17

def host
  @host
end

#queueObject (readonly)

Returns the value of attribute queue.



17
18
19
# File 'lib/quebert/backend/beanstalk.rb', line 17

def queue
  @queue
end

#queues=(value) ⇒ Object

Sets the attribute queues

Parameters:

  • value

    the value to set the attribute queues to.



18
19
20
# File 'lib/quebert/backend/beanstalk.rb', line 18

def queues=(value)
  @queues = value
end

Class Method Details

.configure(opts = {}) ⇒ Object



26
27
28
# File 'lib/quebert/backend/beanstalk.rb', line 26

def self.configure(opts = {})
  new(opts.fetch(:host, "127.0.0.1:11300"), opts.fetch(:queue))
end

Instance Method Details

#drain!Object

For testing purposes… I think there’s a better way to do this though.



40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/quebert/backend/beanstalk.rb', line 40

def drain!
  while peek(:ready) do
    reserve_without_controller.delete
  end
  while peek(:delayed) do
    reserve_without_controller.delete
  end
  while peek(:buried) do
    kick
    reserve_without_controller.delete
  end
end

#put(job) ⇒ Object



56
57
58
59
60
61
62
# File 'lib/quebert/backend/beanstalk.rb', line 56

def put(job)
  tube = beanstalkd_tubes[job.queue || queue]
  tube.put(job.to_json,
    :pri => job.priority,
    :delay => job.delay,
    :ttr => job.ttr + TTR_BUFFER)
end

#reserve(timeout = nil) ⇒ Object



35
36
37
# File 'lib/quebert/backend/beanstalk.rb', line 35

def reserve(timeout=nil)
  Controller::Beanstalk.new(reserve_without_controller(timeout))
end

#reserve_without_controller(timeout = nil) ⇒ Object



30
31
32
33
# File 'lib/quebert/backend/beanstalk.rb', line 30

def reserve_without_controller(timeout=nil)
  watch_tubes
  beanstalkd_tubes.reserve(timeout)
end