FIFO

FIFO is a ruby queueing library built on top of Amazon SQS (Simple Queue Service). Like DelayedJob it encapsulates the common pattern of executing time consuming tasks in the background but unlike DelayedJob it doesn’t rely on a database.

Features

  • Built on Amazon’s reliable and scalable queue service.
  • Connection to SQS are opened lazily negating any initial load time.
  • Use multiple queues with ease.
  • Doesn’t poll the database.
  • Rails ActiveRecord objects maintain state through the queue.
  • Built-in retry mechanism.

Uses

FIFO is extracted from the Mine (http://getmine.com) codebase. Here are some of the things we use it for:

  • Sending emails
  • Processing images
  • Indexing
  • Sharing to social networks
  • Cache management
  • Launching cron jobs
  • Communicate between disjoint parts of a complex application (cron, web, processing servers)

Installation


Pending

Usage

Credentials

Setup an aws access id and aws secret key. They can be found here: https://portal.aws.amazon.com/gp/aws/securityCredentials


FIFO::QueueManager.setup <aws_access_id>, <aws_secret_key>

Initialization

Create a queue object. If an queue by this name doesn’t exist on the Amazon SQS account this will create one.

For Rails, this can be added to a file in “config/initializers/”. Additionally, queue names can include the current Rails environment name for modularity.

There is an additional parameter for message visibility. For more details see: http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/Welcome.html


queue = FIFO::Queue.new <queue_name>

To prevent delays at the start of the application SQS connections are opened lazily. This saves the overheard when initialzing multiple queues:


queues = []
(1..10).each do |i|
  queues << FIFO::Queue.new <queue_name_i>
end

Insertion

The most basic case takes a Ruby object, a method to be called and a list of method parameters. Example of a ruby command and how to run it in the background:


"World".insert 0,"Hello "

queue.push "World",:insert,0,"Hello "

FIFO is designed to work with ActiveRecord objects. Freshness of ActiveRecord objects is ensured by reloading them from the database when they’re popped from the queue. Consider an instance method to process images:


user.process_profile_image {:max_width => 200,:max_height => 150}

queue.push user,:process_profile_image,{:max_width => 200,:max_height => 150}

FIFO also works with ActiveRecord class methods. For example, sending newsletters:


UserMailer.deliver_newsletter emails

queue.push UserMailer,:deliver_newsletter,emails

Processing

Every item pushed into the queue is converted into a FIFO::Payload object which executes the passed method.

Basic flow of processing an item from the queue:


queue.push "Hello World",:length
payload = queue.pop
payload.process
=> 11

Retry functionality takes a failed payload and adds it back into the queue. Payload objects maintain total number of processing attempts.


queue.push "Hello World",:length

begin
  payload = queue.pop
  payload.process
rescue => ex
  payload.retry if payload.attempts < 3
end

Process all items in the queue:


while(true)
  payload = queue.pop
  break unless payload
  
  begin
    payload.process
  rescue => ex
    payload.retry if payload.attempts < 3
  end
end

Multiple queues can be used to simulate priority:


while(true)
  payload = queue.pop
  payload = queue1.pop unless payload
  payload = queue2.pop unless payload
  
  begin
    payload.process
  rescue => ex
    payload.retry if payload.attempts < 3
  end

  sleep 10
end

The Payload flow can be entirely skipped by popping the raw response from the queue:


queue.pop :raw => true

Daemon

Installation

sudo gem install daemons
Install daemon_generator from: https://github.com/dougal/daemon_generator

./script/generate daemon <name>
Source

require File.dirname(__FILE__) + "/../../config/environment"

$running = true
Signal.trap("TERM") do 
  $running = false
end

FIFO::QueueManager.setup <aws_access_id>, <aws_secret_key>

queue = FIFO::Queue.new <queue_name> 
logger = Logger.new(File.join(RAILS_ROOT,"log/<name>.rb.log"))


while($running) do
  payload = queue.pop

  if payload

    begin
      start_time = Time.now
      payload.process
      end_time = Time.now

      logger.info "Finished #{payload.to_s} #{end_time - start_time}"

    rescue => ex
      if payload.attempts < 3
        logger.info "Recovering #{payload.to_s}"
        payload.retry
      else
        #Log Exception
      end
    end

  end 
  
  sleep 5
end

To control the state of the daemon:


lib/daemon/<name>_ctl <start|stop|run>

SQS

Things to know about Amazon’s SQS:

  • The order of items isn’t always maintained. It’s not 100% fifo but close enough.
  • There are multiple connection modes to SQS: per_request, per_thread, single
  • http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/Welcome.html

Credit

FIFO is built by the wonderful people who make Mine: http://getmine.com