Coroutines
A library for creating and composing producer/transformer/consumer coroutines. Producers are already provided by Ruby’s built-in Enumerator class; this library provides Transformer and Consumer classes that work analogously. In particular, they are also based on Fiber and not on threads (as in some other producer/consumer libraries). Also provides a module Sink, which is analogous to Enumerable, and Enumerable/Transformer/Sink composition using overloaded <= and >= operators.
Installing
gem install coroutines
Using
A simple consumer:
require 'coroutines'
def counter(start)
result = start
loop { result += await }
"Final value: #{result}"
end
co = consum_for :counter, 10 # => #<Consumer: main:counter (running)>
co << 10 << 1000 << 10000
co.close # => "Final value: 11020"
The call to Consumer#close raises StopIteration at the point at which the consumer last executed await. In this case, the StopIteration is caught by loop, causing it to terminate.
Note that this is an intentionally simplistic example intended to show the basic library API. Of course, a counter could just as easily be implemented using a closure; the advantage of a consumer is that the implementation could involve arbitrary control structures with multiple calls to await.
A simple transformer:
require 'coroutines'
def running_sum(start)
result = start
loop { result += await; yield result }
end
tr = trans_for :running_sum, 3 # => #<Transformer: main:running_sum>
sums = (1..10) >= tr # => #<Enumerator: #<Transformer: main:running_sum> <= 1..10>
sums.to_a # => [4, 6, 9, 13, 18, 24, 31, 39, 48, 58]
tr = trans_for :running_sum, 0 # => #<Transformer: main:running_sum>
collect_sums = tr >= []
collect_sums << 1 << 1 << 2 << 3 << 5
collect_sums.close # => [1, 2, 4, 7, 12]
Again, this is just a basic demonstration of the API that could be written without resorting to coroutines (though probably not quite as succinctly).
Sources and sinks
As you probably know, many Ruby classes use the Enumerable mixin to provide common functionality like mapping and filtering of sequences of values. We can think of such classes as “sources” of the values yielded by their respective each methods. In the same way, several classes use the << operator to accept sequences of values:
$stdout << "a" << "b" << "c" # prints "abc"
[] << "a" << "b" << "c" # => ["a", "b", "c"]
"" << "a" << "b" << "c" # => "abc"
The coroutines library provides the mixin Sink for such classes. Among other methods, this provides an operator <= for “connecting” sources (i.e. enumerables) to sinks.
open("test.txt", "w") <= ["a", "b", "c"] # write "abc" to test.txt
["a", "b", "c"] <= ("d".."f") # => ["a", "b", "c", "d", "e", "f"]
"abc" <= ("d".."f") # => "abcdef"
Note that <= closes the sink after having exhausted the enumerable, returning whatever #close returns. close defaults to simply returning the sink itself, as in the Array and String examples above. For IO, which has its own close implementation, this implies that the file descriptor will be closed after the <= operation finishes. If this is not what you want, use dup:
$stdout.dup <= ["a", "b", "c"] # print "abc"
For symmetry, the coroutines library augments Enumerable with an operator >= that mirrors <=:
("d".."f") >= "abc" # => "abcdef"
Pipelines involving transformers
Re-using the running_sum example from above:
(1..10) >= trans_for(:running_sum, 0) >= proc{|x| x.to_s + "\n" } >= $stdout.dup
What does this do? Well, it takes the sequences of integers from 1 to 10, then computes the running sum, then convers each partial sum to a string, and finally prints out each string to $stdout. Except that the “thens” in the previous sentence are not entirely correct, since the processing stages run in parallel (using coroutines, so blocking IO in one stage will block all other stages). Instead of (1..10), we could have a File and iterate over GBs of data, and at no point would we need to have the entire sequence in memory.
Any part of a pipeline can be passed around and stored as an individual object:
enum = (1..10) >= trans_for(:running_sum, 0) >= proc{|x| x.to_s + "\n" } # => an Enumerator
cons = trans_for(:running_sum, 0) >= proc{|x| x.to_s + "\n" } >= $stdout.dup # => a Consumer
trans = trans_for(:running_sum, 0) >= proc{|x| x.to_s + "\n" } # => a Transformer
map/filter equivalent pipelines
As shown above, a Proc object can be used in place of a Transformer. Connecting transformer procs to enums, sinks or other procs is special-cased to minimize overhead, so that something like
(1..9) >= proc{|x| x.to_s if x.even? } >= ""
is just syntactic sugar for
(1..9).lazy.select{|x| x.even? }.map{|x| x.to_s }.inject(""){|memo, x| memo << x }
(And yes, the latter could also be written more succinctly if Enumerator::Lazy would provide operations like filter_map and join.)
Links and Acknowledgements
Inspired by David M. Beazley’s Generator Tricks (Python) and Michael Snoyman’s conduit package (Haskell).
Compatibility
Tested with MRI 1.9.3p484, 2.0.0p384 and 2.2.0dev (trunk 47827) on Linux. Should work on other moderately recent versions and other operating systems. Will probably not work on all alternative Ruby implementations, because it depends on the fiber library.
Requiring ‘coroutines’ does some monkey patching to various core classes, which may be a compatibility issue. It is possible to load just the core module ( Sink) and classes ( Consumer and Transformer) using “require ‘coroutines/base’”. Obviously, you will then have to instantiate Consumer and Transformer manually in order to use them; and you’ll have to be more explicit when constructing certain pipelines.
Patched core modules and classes are:
- Enumerable
-
add Enumerable#>= operator
- IO, Array, String
-
include Sink mixin
- Hash
-
define Hash#<< operator and include Sink mixin
- Method
-
define Method#<< operator, define Method#close as an alias for Method#receiver, and include Sink mixin
- Object
-
define Object#await, Object#consum_for and Object#trans_for
- Proc
-
define Proc#to_trans, Proc#<= and Proc#>=
- Symbol
-
define Symbol#to_trans, Symbol#<= and Symbol#>=