Class: Quebert::Backend::Beanstalk
- Inherits:
-
Object
- Object
- Quebert::Backend::Beanstalk
- Extended by:
- Forwardable
- Includes:
- Logging
- Defined in:
- lib/quebert/backend/beanstalk.rb
Overview
Manage jobs on a Beanstalk queue out of process
Constant Summary collapse
- TTR_BUFFER =
A buffer time in seconds added to the Beanstalk TTR for Quebert to do its own job cleanup The job will perform based on the Beanstalk TTR, but Beanstalk hangs on to the job just a little longer so that Quebert can bury the job or schedule a retry with the appropriate delay
5
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#queues ⇒ Object
writeonly
Sets the attribute queues.
Class Method Summary collapse
Instance Method Summary collapse
-
#drain! ⇒ Object
For testing purposes…
-
#initialize(host, queue) ⇒ Beanstalk
constructor
A new instance of Beanstalk.
- #put(job) ⇒ Object
- #reserve(timeout = nil) ⇒ Object
- #reserve_without_controller(timeout = nil) ⇒ Object
Constructor Details
#initialize(host, queue) ⇒ Beanstalk
Returns a new instance of Beanstalk.
20 21 22 23 24 |
# File 'lib/quebert/backend/beanstalk.rb', line 20 def initialize(host, queue) @host = host @queue = queue @queues = [] end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
17 18 19 |
# File 'lib/quebert/backend/beanstalk.rb', line 17 def host @host end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
17 18 19 |
# File 'lib/quebert/backend/beanstalk.rb', line 17 def queue @queue end |
#queues=(value) ⇒ Object
Sets the attribute queues
18 19 20 |
# File 'lib/quebert/backend/beanstalk.rb', line 18 def queues=(value) @queues = value end |
Class Method Details
.configure(opts = {}) ⇒ Object
26 27 28 |
# File 'lib/quebert/backend/beanstalk.rb', line 26 def self.configure(opts = {}) new(opts.fetch(:host, "127.0.0.1:11300"), opts.fetch(:queue)) end |
Instance Method Details
#drain! ⇒ Object
For testing purposes… I think there’s a better way to do this though.
40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/quebert/backend/beanstalk.rb', line 40 def drain! while peek(:ready) do reserve_without_controller.delete end while peek(:delayed) do reserve_without_controller.delete end while peek(:buried) do kick reserve_without_controller.delete end end |
#put(job) ⇒ Object
56 57 58 59 60 61 62 |
# File 'lib/quebert/backend/beanstalk.rb', line 56 def put(job) tube = beanstalkd_tubes[job.queue || queue] tube.put(job.to_json, :pri => job.priority, :delay => job.delay, :ttr => job.ttr + TTR_BUFFER) end |
#reserve(timeout = nil) ⇒ Object
35 36 37 |
# File 'lib/quebert/backend/beanstalk.rb', line 35 def reserve(timeout=nil) Controller::Beanstalk.new(reserve_without_controller(timeout)) end |
#reserve_without_controller(timeout = nil) ⇒ Object
30 31 32 33 |
# File 'lib/quebert/backend/beanstalk.rb', line 30 def reserve_without_controller(timeout=nil) watch_tubes beanstalkd_tubes.reserve(timeout) end |