Circuitry
Notification pub/sub and message queue processing using Amazon SNS & SQS.
Installation
Add this line to your application's Gemfile:
gem 'circuitry'
And then execute:
$ bundle
Or install it yourself as:
$ gem install circuitry
Usage
Circuitry is configured via its configuration object.
Circuitry.config do |c|
c.access_key = 'YOUR_AWS_ACCESS_KEY'
c.secret_key = 'YOUR_AWS_SECRET_KEY'
c.region = 'us-east-1'
c.logger = Rails.logger
c.error_handler = proc do |error|
HoneyBadger.notify(error)
HoneyBadger.flush
end
end
Available configuration options include:
access_key
: The AWS access key ID that has access to SNS publishing and/or SQS subscribing. (required)secret_key
: The AWS secret access key that has access to SNS publishing and/or SQS subscribing. (required)region
: The AWS region that your SNS and/or SQS account lives in. (optional, default: "us-east-1")logger
: The logger to use for informational output, warnings, and error messages. (optional, default:Logger.new(STDOUT)
)error_handler
: An object that responds tocall
with two arguments: the deserialized message contents and the topic name used when publishing to SNS. (optional, default:nil
)
Publishing
Publishing is done via the Circuitry.publish
method. It accepts a topic name
the represents the SNS topic along with any non-nil object, representing the data
to be serialized. Whatever object is called will have its to_json
method
called for serialization.
obj = { foo: 'foo', bar: 'bar' }
Circuitry.publish('any-topic-name', obj)
The publish
method also accepts options that impact instantiation of the
Publisher
object, which currently includes the following options.
:async
- Whether or not publishing should occur in the background. Please refer to the Asynchronous Support section for more details regarding this option. (default: false)
obj = { foo: 'foo', bar: 'bar' }
Circuitry.publish('my-topic-name', obj, async: true)
Alternatively, if your options hash will remain unchanged, you can build a single
Publisher
object to use for all publishing.
options = { ... }
publisher = Circuitry::Publisher.new(options)
publisher.publish('my-topic-name', obj)
Subscribing
Subscribing is done via the Circuitry.subscribe
method. It accepts an SQS queue
URL and takes a block for processing each message. This method performs
synchronously by default, and as such does not return.
Circuitry.subscribe('https://sqs.REGION.amazonaws.com/ACCOUNT-ID/QUEUE-NAME') do |, topic_name|
puts "Received #{topic_name} message: #{.inspect}"
end
The subscribe
method also accepts options that impact instantiation of the
Subscriber
object, which currently includes the following options.
:async
- Whether or not subscribing should occur in the background. Please refer to the Asynchronous Support section for more details regarding this option. (default: false):wait_time
- The number of seconds to wait for messages while connected to SQS. Anything above 0 results in long-polling, while 0 results in short-polling. (default: 10):batch_size
- The number of messages to retrieve in a single SQS request. (default: 10)
Circuitry.subscribe('https://...', async: true, wait_time: 60, batch_size: 20) do |, topic_name|
# ...
end
Alternatively, if your options hash will remain unchanged, you can build a single
Subscriber
object to use for all subscribing.
options = { ... }
subscriber = Circuitry::Subscriber.new(options)
subscriber.subscribe('https://...') do |message, topic_name|
# ...
end
Asynchronous Support
Publishing or subscribing asynchronously occurs by forking a child process. That child is then detached so that your application does not need to worry about waiting for the process to finish.
There are two important notes regarding forking in general as it relates to asynchronous support:
Forking is not supported on all platforms (e.g.: Windows and NetBSD 4), requiring that your implementation use synchronous requests in such circumstances. You can determine if asynchronous requests will work by calling
Circuitry.platform_supports_async?
.Forking results in resources being copied from the parent process to the child process. In order to prevent database connection errors and the like, you should properly handle closing and reopening resources before and after forking, respectively. For example, if you are using Rails with Unicorn, you may need to add the following code to your
unicorn.rb
configuration:before_fork do |server, worker| if defined?(ActiveRecord::Base) ActiveRecord::Base.connection.disconnect! end end after_fork do |server, worker| if defined?(ActiveRecord::Base) ActiveRecord::Base.establish_connection( Rails.application.config.database_configuration[Rails.env] ) end end
Refer to your adapter's documentation to determine how resources are handled with regards to forking.
Development
After checking out the repo, run bin/setup
to install dependencies. Then, run
bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To
release a new version, update the version number in version.rb
, and then run
bundle exec rake release
to create a git tag for the version, push git commits
and tags, and push the .gem
file to rubygems.org.
Contributing
- Fork it ( https://github.com/kapost/circuitry/fork )
- Create your feature branch (
git checkout -b my-new-feature
) - Commit your changes (
git commit -am 'Add some feature'
) - Push to the branch (
git push origin my-new-feature
) - Create a new Pull Request