SimpleQueues

In the Gang of Four books, one of the first few lines is “Program to an interface, not an implementation.” When you need a queue, the only operations you need are enqueue and dequeue. It doesn’t matter that Redis (a nice and simple queue server when you need it) has a ton of extra features which we aren’t going to use.

This library was written and spec’d on Ruby 1.9.2. It is also in use, in production, on JRuby in 1.8 mode.

Exceptions

All underlying exceptions the Redis gem raises are let through. This means you’ll see Errno::ECONNREFUSED, Errno::EAGAIN and friends. Of course, you may also receive ArgumentError if you do something bad.

Usage

require "simple_queues"
# Sane defaults:
# * Defaults to a Redis instance at 127.0.0.1:6379, database 0
# * Defaults to the JSON encoder
Queues = SimpleQueues::Redis.new

Queues.enqueue :pages_to_crawl, :url => "http://blog.teksol.info/"
Queues.enqueue :pages_to_crawl, :url => "http://techcrunch.com/"

# In another process

Queues = SimpleQueues::Redis.new(Redis.new("192.168.1.9", 9281), :encoder => :json)

loop do
  # This blocks until a message is dequeued.
  message = Queues.dequeue_blocking :pages_to_crawl
  process(message)
end

# Alternatively, using a timeout
loop do
  message = Queues.dequeue_with_timeout :pages_to_crawl, 5 # seconds
  if message then
    process(message)
  else
    # Timed out
    break
  end
end
<code>

Multiple Queues

Sometimes, you want to dequeue from multiple queues simultaneously, and react appropriately. When that happens, you need to use #on_dequeue:

require "simple_queues"
Queues = SimpleQueues::Redis.new

$running = true
Queues.on_dequeue :crawler_controls do |message|
  $running = false if message["command"] == "quit"
end

Queues.on_dequeue :pages_to_crawl do |message|
  # Handle crawling a page
end

while $running do
  Queues.dequeue_with_timeout 5 #seconds
end

Alternatively, the block you provide to #on_dequeue can accept two parameters and will be provided with the queue name:

require "simple_queues"
# Provides #underscore, #classify, #constantize and friends
require "active_support/inflector"

# Provides #seconds, #minutes and friends
require "active_support/core_ext/numeric/time"

Queues = SimpleQueues::Redis.new

class BaseWorker
  def initialize(message)
    @message = message
  end
end

class CrawlerControlWorker < BaseWorker
  def run
    $running = false
  end
end

class PagesToCrawlWorker < BaseWorker
  def run
    # Crawl, do your own stuff here
  end
end

def handler(queue_name, message)
  klass = (queue_name.to_s << "_worker").classify.constantize
  klass.new(message).run
end

Queues.on_dequeue :crawler_controls, &method(:handler)
Queues.on_dequeue :pages_to_crawl,   &method(:handler)

while $running
  Queues.dequeue_with_timeout(30.seconds)
end

LICENSE

(The MIT License)

Copyright © 2011 François Beausoleil ([email protected])

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the ‘Software’), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED ‘AS IS’, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.