Resque Bus
This gem uses Redis and Resque to allow simple asynchronous communication between apps.
Example
Application A can publish an event
# config
ResqueBus.redis = "192.168.1.1:6379"
# business logic
ResqueBus.publish("user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith")
# or do it later
ResqueBus.publish_at(1.hour.from_now, "user_created", "id" => 42, "first_name" => "John", "last_name" => "Smith")
Application B is subscribed to events
# config
ResqueBus.redis = "192.168.1.1:6379"
# initializer
ResqueBus.dispatch("app_b") do
# processes event on app_b_default queue
# subscribe is short-hand to subscribe to your 'default' queue and this block with process events with the name "user_created"
subscribe "user_created" do |attributes|
NameCount.find_or_create_by_name(attributes["last_name"]).increment!
end
# processes event on app_b_critical queue
# critical is short-hand to subscribe to your 'critical' queue and this block with process events with the name "user_paid"
critical "user_paid" do |attributes|
CreditCard.charge!(attributes)
end
# you can pass any queue name you would like to process from as well IE: `banana "peeled" do |attributes|`
# and regexes work as well. note that with the above configuration along with this regex,
# the following as well as the corresponding block above would both be executed
subscribe /^user_/ do |attributes|
Metrics.record_user_action(attributes["bus_event_type"], attributes["id"])
end
# the above all filter on just the event_type, but you can filter on anything
# this would be _any_ event that has a user_id and the page value of homepage regardless of bus_event_type
subscribe "my_key", { "user_id" => :present, "page" => "homepage"} do
Mixpanel.homepage_action!(attributes["action"])
end
end
Applications can also subscribe within classes using the provided Subscriber module.
class SimpleSubscriber
include ResqueBus::Subscriber
subscribe :my_method
def my_method(attributes)
# heavy lifting
end
end
The following is equivalent to the original initializer and shows more options:
class OtherSubscriber
include ResqueBus::Subscriber
application :app_b
subscribe :user_created
subscribe_queue :app_b_critical, :user_paid
subscribe_queue :app_b_default, :user_action, :bus_event_type => /^user_/
subscribe :homepage_method, :user_id => :present, :page => "homepage"
def user_created(attributes)
NameCount.find_or_create_by_name(attributes["last_name"]).increment!
end
def user_paid(attributes)
CreditCard.charge!(attributes)
end
def user_action(attributes)
Metrics.record_user_action(attributes["bus_event_type"], attributes["id"])
end
def homepage_method
Mixpanel.homepage_action!(attributes["action"])
end
end
The subscription block is run inside a Resque worker which needs to be started for each app.
$ rake resquebus:setup resque:work
The incoming queue also needs to be processed on a dedicated or all the app servers.
$ rake resquebus:driver resque:work
If you want retry to work for subscribing apps, you should run resque-scheduler
$ rake resquebus:driver resque:scheduler
Compatibility
ResqueBus can live along side another instance of Resque that points at a different Redis server.
# config
Resque.redis = "192.168.1.0:6379"
ResqueBus.redis = "192.168.1.1:6379"
If no Redis instance is given specifically, ResqueBus will use the Resque one.
# config
Resque.redis = "192.168.1.0:6379"
That will use the default (resque) namespace which can be helpful for using the tooling. Conflict with queue names are unlikely. You can change the namespace if you like though.
# config
Resque.redis = "192.168.1.0:6379"
ResqusBus.redis.namespace = :get_on_the_bus
Local Mode
For development, a local mode is also provided and is specified in the configuration.
# config
ResqueBus.local_mode = :standalone
or
ResqueBus.local_mode = :inline
Standalone mode does not require a separate resquebus:driver task to be running to process the incoming queue. Simply publishing to the bus will distribute the incoming events to the appropriate application specific queue. A separate resquebus:work task does still need to be run to process these events
Inline mode skips queue processing entirely and directly dispatches the event to the appropriate code block.
TODO
- There are a few spots in the code with TODO notes
- Make this not freak out in development without Redis or when Redis is down
- We might not actually need to publish in tests
- Add some rspec helpers for the apps to use: should_ post an event_publish or something along those lines
- A synchronous version will be needed for several use cases. Make it so that an event can go in real-time to one subscriber and still be async to the rest.
- Should this use resque-retry or should they jsut go into the failure queue?
Copyright (c) 2011 Brian Leonard, released under the MIT license