Circuitry

Notification pub/sub and message queue processing using Amazon SNS & SQS.

Code Climate Test Coverage

Installation

Add this line to your application's Gemfile:

gem 'circuitry'

And then execute:

$ bundle

Or install it yourself as:

$ gem install circuitry

Usage

Circuitry is configured via its configuration object.

Circuitry.config do |c|
  c.access_key = 'YOUR_AWS_ACCESS_KEY'
  c.secret_key = 'YOUR_AWS_SECRET_KEY'
  c.region = 'us-east-1'
  c.logger = Rails.logger
  c.error_handler = proc do |error|
    HoneyBadger.notify(error)
    HoneyBadger.flush
  end
end

Available configuration options include:

  • access_key: The AWS access key ID that has access to SNS publishing and/or SQS subscribing. (required)
  • secret_key: The AWS secret access key that has access to SNS publishing and/or SQS subscribing. (required)
  • region: The AWS region that your SNS and/or SQS account lives in. (optional, default: "us-east-1")
  • logger: The logger to use for informational output, warnings, and error messages. (optional, default: Logger.new(STDOUT))
  • error_handler: An object that responds to call with two arguments: the deserialized message contents and the topic name used when publishing to SNS. (optional, default: nil)

Publishing

Publishing is done via the Circuitry.publish method. It accepts a topic name the represents the SNS topic along with any non-nil object, representing the data to be serialized. Whatever object is called will have its to_json method called for serialization.

obj = { foo: 'foo', bar: 'bar' }
Circuitry.publish('any-topic-name', obj)

The publish method also accepts options that impact instantiation of the Publisher object, which currently includes the following options.

  • :async - Whether or not publishing should occur in the background. Please refer to the Asynchronous Support section for more details regarding this option. (default: false)
obj = { foo: 'foo', bar: 'bar' }
Circuitry.publish('my-topic-name', obj, async: true)

Alternatively, if your options hash will remain unchanged, you can build a single Publisher object to use for all publishing.

options = { ... }
publisher = Circuitry::Publisher.new(options)
publisher.publish('my-topic-name', obj)

Subscribing

Subscribing is done via the Circuitry.subscribe method. It accepts an SQS queue URL and takes a block for processing each message. This method performs synchronously by default, and as such does not return.

Circuitry.subscribe('https://sqs.REGION.amazonaws.com/ACCOUNT-ID/QUEUE-NAME') do |message, topic_name|
  puts "Received #{topic_name} message: #{message.inspect}"
end

The subscribe method also accepts options that impact instantiation of the Subscriber object, which currently includes the following options.

  • :async - Whether or not subscribing should occur in the background. Please refer to the Asynchronous Support section for more details regarding this option. (default: false)
  • :wait_time - The number of seconds to wait for messages while connected to SQS. Anything above 0 results in long-polling, while 0 results in short-polling. (default: 10)
  • :batch_size - The number of messages to retrieve in a single SQS request. (default: 10)
Circuitry.subscribe('https://...', async: true, wait_time: 60, batch_size: 20) do |message, topic_name|
  # ...
end

Alternatively, if your options hash will remain unchanged, you can build a single Subscriber object to use for all subscribing.

options = { ... }
subscriber = Circuitry::Subscriber.new(options)
subscriber.subscribe('https://...') do |message, topic_name|
  # ...
end

Asynchronous Support

Publishing or subscribing asynchronously occurs by forking a child process. That child is then detached so that your application does not need to worry about waiting for the process to finish.

There are two important notes regarding forking in general as it relates to asynchronous support:

  1. Forking is not supported on all platforms (e.g.: Windows and NetBSD 4), requiring that your implementation use synchronous requests in such circumstances. You can determine if asynchronous requests will work by calling Circuitry.platform_supports_async?.

  2. Forking results in resources being copied from the parent process to the child process. In order to prevent database connection errors and the like, you should properly handle closing and reopening resources before and after forking, respectively. For example, if you are using Rails with Unicorn, you may need to add the following code to your unicorn.rb configuration:

    before_fork do |server, worker|
      if defined?(ActiveRecord::Base)
        ActiveRecord::Base.connection.disconnect!
      end
    end
    
    after_fork do |server, worker|
      if defined?(ActiveRecord::Base)
        ActiveRecord::Base.establish_connection(
          Rails.application.config.database_configuration[Rails.env]
        )
      end
    end
    

Refer to your adapter's documentation to determine how resources are handled with regards to forking.

Development

After checking out the repo, run bin/setup to install dependencies. Then, run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release to create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.

Contributing

  1. Fork it ( https://github.com/kapost/circuitry/fork )
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request