EM::Beanstalk

An attempt to wrap portions of the Beanstalk protocol with EventMachine. Every command will return a deferrable object. As well, every command can accept a block with will be used as the succeed callback for each command. That object will succeed or fail depending on the reply returned from Beanstalk.

The default errback is to print the error message received.

One thing to keep in mind. The Beanstalk protocol executes all commands serially. So, if you send a reserve command and there are no jobs Beanstalk _won’t_ process any of the commands that come after the reserve until the reserve completes.

This is a bit of a gotcha when sending a series of reserve, delete, etc and there are no available jobs.

Dependencies

- eventmachine

For testing

- rspec
- em-spec

Examples

EM.run {
  jack = EM::Beanstalk.new

  jack.use('mytube') { |tube| puts "Using #{tube}" }

  jack.reserve do |job|
    puts job.id
    process(job)

    jack.delete(job) { puts "Successfully deleted" }
  end

  jack.put("my message", :ttr => 300) { |id| puts "put successful #{id}" }

  jack.stats { |stats| puts "Server up for #{stats['uptime']} seconds" "}

  jack.stats(:tube, "mytube") { |stats| puts "Total jobs #{stats['total-jobs']}" }

  jack.list(:tubes) { |tubes| puts "There are #{tubes.length} tubes defined" }
}

If you need custom error handling, you can chain the error event handler

Examples

EM.run {

jack = EM::Beanstalk.new

jack.delete(job_id) {
  puts "deleted job!"
}.on_error {|message|
  puts "I couldn't delete the job because of #{message}"
}

}

EM::Beanstalk#each_job is useful for scenarios where the client is a job worker and intended to process jobs continuously as they become available. Once the queue is empty, the client will block and wait for a new job.

If multiple workers connect to the queue Beanstalkd will round-robin between the workers.

EM.run {
  jack = EM::Beanstalk.new

  jack.each_job do |job|
    puts "Got job ##{job.id}: #{job}"

    if process(job)
      jack.delete(job) { puts "*Deleted #{job}*" }
    else
      # something went horribly wrong!
    end
  end

  def process(job)
    # Some kind of job processing
  end
}