FIFO
FIFO is a ruby queueing library built on top of Amazon SQS (Simple Queue Service). Like DelayedJob it encapsulates the common pattern of executing time consuming tasks in the background but unlike DelayedJob it doesn’t rely on a database.
Features
- Built on Amazon’s reliable and scalable queue service.
- Connection to SQS are opened lazily negating any initial load time.
- Use multiple queues with ease.
- Doesn’t poll the database.
- Rails ActiveRecord objects maintain state through the queue.
- Built-in retry mechanism.
Uses
FIFO is extracted from the Mine (http://getmine.com) codebase. Here are some of the things we use it for:
- Sending emails
- Processing images
- Indexing
- Sharing to social networks
- Cache management
- Launching cron jobs
- Communicate between disjoint parts of a complex application (cron, web, processing servers)
Installation
Pending
Usage
Credentials
Setup an aws access id and aws secret key. They can be found here: https://portal.aws.amazon.com/gp/aws/securityCredentials
FIFO::QueueManager.setup <aws_access_id>, <aws_secret_key>
Initialization
Create a queue object. If an queue by this name doesn’t exist on the Amazon SQS account this will create one.
For Rails, this can be added to a file in “config/initializers/”. Additionally, queue names can include the current Rails environment name for modularity.
There is an additional parameter for message visibility. For more details see: http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/Welcome.html
queue = FIFO::Queue.new <queue_name>
To prevent delays at the start of the application SQS connections are opened lazily. This saves the overheard when initialzing multiple queues:
queues = []
(1..10).each do |i|
queues << FIFO::Queue.new <queue_name_i>
end
Insertion
The most basic case takes a Ruby object, a method to be called and a list of method parameters. Example of a ruby command and how to run it in the background:
"World".insert 0,"Hello "
queue.push "World",:insert,0,"Hello "
FIFO is designed to work with ActiveRecord objects. Freshness of ActiveRecord objects is ensured by reloading them from the database when they’re popped from the queue. Consider an instance method to process images:
user.process_profile_image {:max_width => 200,:max_height => 150}
queue.push user,:process_profile_image,{:max_width => 200,:max_height => 150}
FIFO also works with ActiveRecord class methods. For example, sending newsletters:
UserMailer. emails
queue.push UserMailer,:deliver_newsletter,emails
Processing
Every item pushed into the queue is converted into a FIFO::Payload object which executes the passed method.
Basic flow of processing an item from the queue:
queue.push "Hello World",:length
payload = queue.pop
payload.process
=> 11
Retry functionality takes a failed payload and adds it back into the queue. Payload objects maintain total number of processing attempts.
queue.push "Hello World",:length
begin
payload = queue.pop
payload.process
rescue => ex
payload.retry if payload.attempts < 3
end
Process all items in the queue:
while(true)
payload = queue.pop
break unless payload
begin
payload.process
rescue => ex
payload.retry if payload.attempts < 3
end
end
Multiple queues can be used to simulate priority:
while(true)
payload = queue.pop
payload = queue1.pop unless payload
payload = queue2.pop unless payload
begin
payload.process
rescue => ex
payload.retry if payload.attempts < 3
end
sleep 10
end
The Payload flow can be entirely skipped by popping the raw response from the queue:
queue.pop :raw => true
Daemon
Installation
sudo gem install daemons
Install daemon_generator from: https://github.com/dougal/daemon_generator
./script/generate daemon <name>
Source
require File.dirname(__FILE__) + "/../../config/environment"
$running = true
Signal.trap("TERM") do
$running = false
end
FIFO::QueueManager.setup <aws_access_id>, <aws_secret_key>
queue = FIFO::Queue.new <queue_name>
logger = Logger.new(File.join(RAILS_ROOT,"log/<name>.rb.log"))
while($running) do
payload = queue.pop
if payload
begin
start_time = Time.now
payload.process
end_time = Time.now
logger.info "Finished #{payload.to_s} #{end_time - start_time}"
rescue => ex
if payload.attempts < 3
logger.info "Recovering #{payload.to_s}"
payload.retry
else
#Log Exception
end
end
end
sleep 5
end
To control the state of the daemon:
lib/daemon/<name>_ctl <start|stop|run>
SQS
Things to know about Amazon’s SQS:
- The order of items isn’t always maintained. It’s not 100% fifo but close enough.
- There are multiple connection modes to SQS: per_request, per_thread, single
- http://docs.amazonwebservices.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/Welcome.html
Credit
FIFO is built by the wonderful people who make Mine: http://getmine.com