Stapfen
Stapfen is a simple gem to make writing workers that consume messages via STOMP or JMS easier.
Stapfen allows you to write one worker class, and use either protocol depending on the environment and needs.
Usage
(Examples can be found in the examples/
directory)
Consider the following myworker.rb
file:
class MyWorker < Stapfen::Worker
configure do |worker|
# You can also specify your own logger, but this is the default...
worker.logger = Logger.new(STDOUT)
worker.protocol = STOMP
worker. = {
:hosts => [
{
:host => 'localhost',
:port => 61613,
:login => 'guest',
:passcode => 'guest',
:ssl => false
}
],
:topic => 'thequeue',
:dead_letter_queue => '/queue/dlq',
:max_redeliveries => 0
}
end
consume do ||
data = expensive_computation(.body)
# Save my data, or do something worker-specific with it
persist(data)
# Send another message
client.publish('/topic/computation-acks', "finished with #{.}")
end
end
MyWorker.run!
When using the STOMP protocol, worker.client_options
can be set with any of the attributes described in a Stomp::Client
connection
hash as well as any subscription
options.
When using the JMS protocol, worker.client_options
can be set with any of the attributes described in configuration
hash for the
jruby-jms gem.
Kafka example
Using with Kafka requires a configuration with the topic, groupID, and zookeepers string.
require 'stapfen'
require 'stapfen/worker'
class MyWorker < Stapfen::Worker
configure do |worker|
# You can also specify your own logger, but this is the default...
worker.logger = Logger.new(STDOUT)
worker.protocol = KAFKA
worker. = {
:topic => 'test',
:groupId => 'groupId',
:zookeepers => 'localhost:2181' # comma separated string of zookeepers
}
end
consume do ||
puts "Recv: #{.body}"
end
end
MyWorker.run!
Notes
- Testing with Kafka
- Start Staphen worker first
- Using producer included with kafka
- Produce some messages
echo foobar | bin/kafka-console-producer.sh --broker-list <brokers> --topic <topic>
- Worker should be able to read the message
- using the same groupId a consumer will start reading from the last offset that was read by a consumer from the same group
- For example, Given 2 consumers belong to the same groupId
- Consumer1 reads a few messages and dies
- A producer produces 5 messages
- Consumer2 starts up and will receive the 5 messages produced because it started at the last offset of Consumer1
It is also important to note that the consume
block will be invoked inside an
instance of MyWorker
and will execute inside its own Thread
, so take
care when accessing other shared resources.
Also note you'll need to include the zk gem manually.
Fallback and dead-letter-queue support
The consume block accepts the usual subscriptions headers, as well as two
additional headers :dead_letter_queue
and :max_redeliveries
. If either of
the latter two is present, the consumer will unreceive any messages for which
the block returns false
; after :max_redeliveries
, it will send the message
to :dead_letter_queue
. consume
blocks without these headers will fail
silently rather than unreceive.
Installation
Add this line to your application's Gemfile:
gem 'stapfen'
And then execute:
$ bundle
Or install it yourself as:
$ gem install stapfen
Running Specs
Download this from jar from Maven Central
- activemq-all-5.8.0.jar
wget -O activemq-all-5.8.0.jar http://search.maven.org/remotecontent?filepath=org/apache/activemq/activemq-all/5.8.0/activemq-all-5.8.0.jar
- Put it in gem root
rake spec