Class: Quebert::Backend::Beanstalk

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Includes:
Logging
Defined in:
lib/quebert/backend/beanstalk.rb

Overview

Manage jobs on a Beanstalk queue out of process

Constant Summary collapse

TTR_BUFFER =

A buffer time in seconds added to the Beanstalk TTR for Quebert to do its own job cleanup The job will perform based on the Beanstalk TTR, but Beanstalk hangs on to the job just a little longer so that Quebert can bury the job or schedule a retry with the appropriate delay

5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, queue) ⇒ Beanstalk


20
21
22
23
24
# File 'lib/quebert/backend/beanstalk.rb', line 20

def initialize(host, queue)
  @host = host
  @queue = queue
  @queues = []
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host


17
18
19
# File 'lib/quebert/backend/beanstalk.rb', line 17

def host
  @host
end

#queueObject (readonly)

Returns the value of attribute queue


17
18
19
# File 'lib/quebert/backend/beanstalk.rb', line 17

def queue
  @queue
end

#queues=(value) ⇒ Object

Sets the attribute queues


18
19
20
# File 'lib/quebert/backend/beanstalk.rb', line 18

def queues=(value)
  @queues = value
end

Class Method Details

.configure(opts = {}) ⇒ Object


26
27
28
# File 'lib/quebert/backend/beanstalk.rb', line 26

def self.configure(opts = {})
  new(opts.fetch(:host, "127.0.0.1:11300"), opts.fetch(:queue))
end

Instance Method Details

#drain!Object

For testing purposes… I think there's a better way to do this though.


40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/quebert/backend/beanstalk.rb', line 40

def drain!
  while peek(:ready) do
    reserve_without_controller.delete
  end
  while peek(:delayed) do
    reserve_without_controller.delete
  end
  while peek(:buried) do
    kick
    reserve_without_controller.delete
  end
end

#put(job) ⇒ Object


56
57
58
59
60
61
62
# File 'lib/quebert/backend/beanstalk.rb', line 56

def put(job)
  tube = beanstalkd_tubes[job.queue || queue]
  tube.put(job.to_json,
    :pri => job.priority,
    :delay => job.delay,
    :ttr => job.ttr + TTR_BUFFER)
end

#reserve(timeout = nil) ⇒ Object


35
36
37
# File 'lib/quebert/backend/beanstalk.rb', line 35

def reserve(timeout=nil)
  Controller::Beanstalk.new(reserve_without_controller(timeout))
end

#reserve_without_controller(timeout = nil) ⇒ Object


30
31
32
33
# File 'lib/quebert/backend/beanstalk.rb', line 30

def reserve_without_controller(timeout=nil)
  watch_tubes
  beanstalkd_tubes.reserve(timeout)
end