mercury
Mercury is a messaging layer that hides complexity for typical messaging scenarios.
- backed by AMQP
- runs in an EventMachine reactor
- asynchronous API
- serializes messages as JSON
- optional monadic interface makes testing easier
- simulator allows tests to run without a RabbitMQ server
Constructs
In the above example, two publishers independently publish messages to the same source.
One of them chooses to attach the tag foo.success
to the messages it publishes.
When the source receives a message, it immediately broadcasts the message to all attached
queues.
Two independent listeners receive messages from the source. A listener has a private queue that is automatically removed when the listener disconnects from the server (i.e., any queued messages the listener would have received are lost).
A worker pool "A" has two workers handling messages. Messages in the queue are dealt to the workers as the workers acknowledge handling prior messages. If a worker disconnects without acknowledging completion, the message(s) it was working on are automatically replaced at the head of the queue. Worker queues persist even if all workers disconnect. Each worker pool has its own queue. Consequently, workers within a pool compete for messages, whereas workers across pools do not compete for messages.
Worker pool "B" has just one worker and has specified a tag filter.
Listeners and worker pools can specify tag filters which limit the messages they receive
to only those with tags matching a particular pattern. A tag is zero or more words delimited
by periods. A word is a string of alphanumeric characters. A filter is a string of zero or more
words or wildcards. A wildcard is either *
or #
.
*
matches exactly one word.#
matches zero or more words.
Note: A filter with zero words (''
) matches no messages.
Note: Wildcards match only entire words. The filter f*.success
does not match the tag foo.success
.
A typical scenario is to have a single worker pool processing messages.
The publishers are often instances of the same service: either some other mercury worker pool or a web service.
Operations
open(parallelism)
Creates a mercury instance. paralellism is a natural number that indicates the maximum total number of outstanding (unacknowledged) messages allowed for this instance's workers. See the code for additional parameters.
publish(source_name, msg, tag)
Publishes a message to a source, with an optional tag.
start_worker(worker_pool_name, source_name, tag_filter, msg_handler)
Starts a worker in the specified pool subscribing to the specified source with the specified filter (optional). Incoming messages are passed to the message handler procedure.
start_listener(source_name, tag_filter, msg_handler)
Starts a listener.
ack(msg)
Indicates message handling succeeded. The message is removed from the queue.
nack(msg)
Indicates message handling failed, with the assumption that it might succeed at a later time. The message is returned to the front of the queue.
reject(msg)
Indicates message handling failed, with the assumption that it can never succeed. The message is removed from the queue.
republish(msg)
Like nack, except the message is returned to the back of the queue.
Note: All operations create the referenced constructs if they do not already exist.
Serialization
If a message is a hash, mercury serializes it to JSON upon sending and deserializes it upon receiving.
If a message is a string, mercury writes it directly without any serialization; this allows a client to pre-encode a message using an arbitrary encoding. The receiving client receives the exact same string as the message content (assuming the serialized message fails to parse as JSON).
Thread safety
Mercury is not threadsafe. All calls to a particular instance must be made from the thread in which the instance was created.
Error handling
If there is a communication error (e.g., the connection to the server breaks) mercury raises an error. This behavior can be overridden.
Long running operations
The AMQP gem uses EventMachine timers to send heartbeats to the
server. Message handling blocks the event loop, so each message must
be handled within a few seconds, otherwise the server will miss a
heartbeat and disconnect the client. Long-running message handlers
should move their work to a thread pool to avoid blocking the event
loop.
EventMachine.defer
and
fiber_defer
are two facilities for accomplishing this.
Example Usage
require 'mercury'
def run
EventMachine.run do
Mercury.open do |m|
m.start_worker('cooks', 'orders', method(:handle_message)) do
# When this continuation block is called, the worker is guaranteed
# to have been started, so we can start publishing orders.
m.publish('orders', {'table' => 5, 'items' => ['salad', 'steak', 'cake']})
end
end
end
end
def (msg)
order = msg.content
cook(order)
msg.ack
end
Notably, mercury also has a monadic interface that hides the explicit continuation passing introduced by asynchrony, which has the effect of flattening chained calls. This is particularly useful for testing, where the same code plays both sides of a conversation. Compare:
require 'mercury'
Mercury.open do |m|
m.start_listener(source, proc{}) do
m.source_exists?(source) do |r1|
expect(r1).to be true
m.delete_source(source) do
m.source_exists?(source) do |r2|
expect(r2).to be false
m.close do
done
end
end
end
end
end
end
# ... vs ...
require 'mercury/monadic'
seql do
let(:m) { Mercury::Monadic.open }
and_then { m.start_listener(source) }
let(:r1) { m.source_exists?(source) }
and_lift { expect(r1).to be true }
and_then { m.delete_source(source) }
let(:r2) { m.source_exists?(source) }
and_lift { expect(r2).to be false }
and_then { m.close }
and_lift { done }
end
Monadic Interface
The monad is a versatile design pattern. There is plenty of
literature online, but for now all you need to know is that mercury
uses monad principles to chain together asynchronous operations. It
all starts with a Cps
object.
2.2.2 :005 > add1 = Cps.new { |n, &k| k.call(n+1) }
=> #<Mercury::Cps:...>
Much like Proc.new
, Cps.new
merely captures some operation in an
object but does not do any actual work. The key difference is that
Cps
captures a "continuation-passing style" ("CPS") operation. That is,
instead of returning a value to the caller, the operation takes its
continuation as an additional argument k
. (The actual name doesn't matter,
of course, but k
is traditional.) k
is simply a Proc
.
You can loosely think of it as the "return Proc
", as opposed to
the usual return statement.
To invoke a Proc
we call Proc#call
. To invoke a Cps
, we call
Cps#run
, passing the normal arguments as well as the continuation
(the block).
2.2.2 :006 > add1.run(2) { |result| puts "result = #{result}" }
result = 3
As you've seen already, asynchronous APIs are closely tied to CPS. In the case of mercury, an operation may involve a conversation with the server. CPS allows our code to go off and do other things -- namely, handle other independent requests -- but also be notified when the operation finally completes.
Combining operations
Cps
provides a means of combining operations into a larger
operation: Cps#and_then
.
def add1(n)
Cps.new { |&k| k.call(n+1) }
end
def print_value(n)
Cps.new do |&k|
puts "value = #{n}"
k.call
end
end
def add1_and_print(n)
add1(n).and_then { |v| print_value(v) }
end
2.2.2 :028 > add1_and_print(2).run
value = 3
Cps#and_then
's block accepts the result of the previous operation and
returns the Cps
object representing the next operation to perform.
As it turns out, the best way to factor an operation is as a method that
- accepts the operation arguments, and
- returns a
Cps
object
as seen above.
Sequences
As you can imagine, long and_then
chains can get syntactially messy.
This is where seq
comes in.
def add1_and_print(n)
seq do |th|
th.en { add1(n) }
th.en { |v| print_value(v) }
end
end
This is still not ideal; it would be nice to have a way to bind v
to
the result of add1(n)
rather than introducing a parameter on the
line below. seql
contains some magic that allows us to do
exactly this. (It also eliminates the weird th.en
.)
def add1_and_print(n)
seql do
let(:v) { add1(n) }
and_then { print_value(v) }
end
end
Another benefit of seql
and let
is that it makes v
visible to
all subsequent and_then
blocks, not just the immediately following
one.
But what if we want to introduce a non-CPS operation into our sequence?
def add1_and_print(n)
seql do
let(:v) { add1(n) }
and_then { puts 'added!' }
and_then { print_value(v) }
end
end
2.2.2 :061 > add1_and_print(2).run
added!
RuntimeError: 'and_then' block did not return a Cps object.
This fails because it violates the requirement that the and_then
block return a Cps
object, and puts
does not. lift
returns a
Cps
object for a block of direct-style code.
def add1_and_print(n)
seql do
let(:v) { add1(n) }
and_then { lift { puts 'added!' } }
and_then { print_value(v) }
end
end
2.2.2 :053 > add1_and_print(2).run
added!
value = 3
Finally, a little clean up:
def add1_and_print(n)
seql do
let(:v) { add1(n) }
and_lift { puts 'added!' }
and_then { print_value(v) }
end
end
Mercury::Monadic
Mercury::Monadic
simply wraps Mercury
so that the methods return
Cps
objects rather than accepting an explicit continuation.
seql do
let(:m) { Mercury::Monadic.open }
and_then { m.start_listener(source) }
...
end
It is particularly useful when writing tests.
Design Details
Mercury#republish
This method publishes a copy of the message to the back of the
queue, then acks the original message. This is a similar operation to
ack/nack/reject. It is only applicable to messages received by
workers, since listener messages cannot be acknowledged. Unlike other
acknowledgements, #republish
takes a continuation (due to the
publish operation), so the method is located on Mercury
rather than
Mercury::ReceivedMessage
.
It is important that the message is republished to the AMQP queue and not the source. Acknowledgement is a worker concern. If two different worker pools were working off a common source, it wouldn't make sense for pool A to get a duplicate message because pool B failed to handle the message.
AMQP allows publishing to a particular queue by sending the message to
the default exchange, specifying the queue name as
the routing key. Thus, if the message was originally sent with a
non-empty routing key, that information is lost. Some clients rely on
the routing key/tag to dictate behavior (by reading
Mercury::ReceivedMessage#tag
). To avoid breaking such clients,
republish propagates the original tag in a header and reports this value
as the tag on the republished message.
Republishing also introduces a Republish-Count
header and
corresponding attribute Mercury::ReceivedMessage#republish_count
.
This value is incremented each time the message is republished.
Clients may want to check this value and act differently if it exceeds
some threshold.
Design Decisions
Continuation blocks are required
It would be possible to make continuation blocks optional. The problem is that this allows the user to make the mistake of treating the API as synchronous and only discover their error when tests fail, probably intermittently. An empty block can be passed, but at least it indicates that the continuation is being intentionally ignored.