Async::Enumerable
Async::Enumerable extends Ruby's Enumerable module with asynchronous capabilities, allowing you to perform operations in parallel using the socketry/async library.
Installation
In your bundler-managed project, run
bundle add async-enumerable
Or to install globally:
gem install async-enumerable
Usage
Async::Enumerable adds an .async method to any Enumerable object, which returns
an Async::Enumerator that performs operations in parallel:
require 'async/enumerable'
# Convert any enumerable to async
results = [1, 2, 3, 4, 5].async.map { |n| n * 2 }
# => [2, 4, 6, 8, 10]
# Works with any enumerable
(1..100).async.select { |n| expensive_check(n) }
# Chain async operations
data.async
.select { |item| item.valid? }
.map { |item| process(item) }
.take(10)
Including in Your Classes
Async::Enumerable can be included in your own classes to add async capabilities:
class TodoList
include Async::Enumerable
def_async_enumerable :@todos # Specify which ivar/method returns the enumerable
def initialize
@todos = []
end
def add(todo)
@todos << todo
self
end
attr_reader :todos
end
list = TodoList.new
list.add("Buy milk").add("Write code").add("Review PR")
# Process todos asynchronously
completed = list.async.map { |todo| process_todo(todo) }.sync
You can also specify a default fiber limit for your class:
class ApiClient
include Async::Enumerable
def_async_enumerable :@endpoints, max_fibers: 10 # Limit concurrent requests
attr_reader :endpoints
end
Parallel Execution
The main benefit of Async::Enumerable is parallel execution of block operations:
require 'async/enumerable'
require 'net/http'
urls = [
'https://api.github.com/users/ruby',
'https://api.github.com/users/rails',
'https://api.github.com/users/matz'
]
# Synchronous - requests made sequentially
responses = urls.map do |url|
Net::HTTP.get(URI(url))
end
# Asynchronous - requests made in parallel
responses = urls.async.map do |url|
Net::HTTP.get(URI(url))
end
Supported Methods
Async::Enumerable supports all Enumerable methods through different strategies:
Methods with Async Implementations
Most Enumerable methods work automatically through the async implementation of each:
map,select,reject,collect,filter,filter_mapreduce,inject,sum,min,max,minmaxcount,tally,group_by,partitionsort,sort_by,uniq,compactto_a,to_h,entrieseach_with_index,each_with_object,with_indexzip,cycle,chunk,slice_*
These methods automatically benefit from parallel execution when blocks contain I/O or expensive operations.
Methods with Optimized Early Termination
The Predicates module provides optimized implementations that stop processing as soon as the result is determined:
all?- Returns true if all elements match (stops on first false)any?- Returns true if any element matches (stops on first true)none?- Returns true if no elements match (stops on first true)one?- Returns true if exactly one element matchesinclude?/member?- Check if collection includes a valuefind/detect- Find first matching element *find_index- Find index of matching element *
* Important: find and find_index return the fastest completing result, not necessarily the first element in order. See Parallel Execution Behavior below.
Methods Delegated to Synchronous Implementation
Some methods are inherently sequential and are delegated back to the wrapped enumerable:
first- Takes elements from the beginningtake- Takes first n elementstake_while- Must evaluate elements in orderlazy- Returns a standard lazy enumerator (lazy evaluation uses break internally, incompatible with async)
Method Chaining
Async::Enumerator maintains the async context through method chains. Transformation methods like map, select, and reject return new Async::Enumerator instances, allowing you to chain multiple operations while staying in "async land":
# Chain stays async until .sync
result = [1, 2, 3, 4, 5].async
.map { |x| expensive_operation(x) } # Returns Async::Enumerator
.select { |x| x > threshold } # Returns Async::Enumerator
.map { |x| transform(x) } # Returns Async::Enumerator
.sync # Returns Array
# The .sync method explicitly converts back to an array
data = urls.async
.map { |url| fetch_data(url) }
.select { |data| data.valid? }
.sync # Get final results as array
Async::Enumerator also implements comparison operators, so it can be compared directly with arrays:
# Equality comparison works without .sync
async_result = [1, 2, 3].async.map { |x| x * 2 }
async_result == [2, 4, 6] # => true
# This makes testing clean and simple
expect(data.async.select { |x| x.valid? }).to eq(expected_valid_items)
Module Structure
Async::Enumerable is organized into logical modules for better maintainability and selective inclusion:
Async::Enumerable::Methods::Transformers- Methods that transform collections (map, select, reject, etc.)Async::Enumerable::Methods::Predicates- Methods that test conditions with early termination (all?, any?, none?, one?, include?, find, find_index)Async::Enumerable::Methods::Converters- Methods that convert to other types (to_a, sync)Async::Enumerable::Methods::Aggregators- Aggregation methods inherited from Enumerable (reduce, sum, count, etc.)Async::Enumerable::Methods::Iterators- Iteration helpers inherited from Enumerable (each_with_index, each_cons, etc.)Async::Enumerable::Methods::Slicers- Slicing/filtering methods inherited from Enumerable (drop, grep, partition, etc.)Async::Enumerable::ConcurrencyBounder- Cross-cutting concern for limiting concurrent fibersAsync::Enumerable::Configurable- Configuration management system with hierarchical config inheritance and collection resolutionAsync::Enumerable::Comparable- Comparison operators for async enumerators
You can selectively include specific modules if needed:
class CustomAsync
include Enumerable
include Async::Enumerable::Methods::Transformers::Map
include Async::Enumerable::Methods::Converters::Sync
include Async::Enumerable::ConcurrencyBounder
# Only has async map and sync methods
end
Parallel Execution Behavior
Due to the parallel nature of async operations, some methods behave differently than their synchronous counterparts:
Find and Find_index Return Fastest Result
When using find, detect, or find_index with async enumeration, the result returned is the first to complete evaluation, not necessarily the first element in the collection order:
# Synchronous - always returns 3 (first element > 2)
[1, 2, 3, 4, 5].find { |n| n > 2 } # => 3
# Async - returns whichever completes first
[1, 2, 3, 4, 5].async.find { |n|
sleep(6 - n) # Element 5 completes first
n > 2
} # => Could be 3, 4, or 5 (likely 5 due to shorter sleep)
This is a performance optimization - as soon as any matching element is found, the search terminates immediately without waiting for earlier elements to complete.
When Order Matters
If you need the first element by position rather than the fastest to evaluate, you have several options:
# Option 1: Use synchronous enumeration
collection.find { |item| expensive_check(item) }
# Option 2: Process in order then find
collection.async.map { |item| [item, expensive_check(item)] }
.find { |item, result| result }
&.first
# Option 3: Use with_index to track position
matches = collection.async.filter_map.with_index do |item, index|
expensive_check(item) ? [index, item] : nil
end
first_match = matches.min_by { |index, _| index }&.last
This behavior applies to:
find/detect- Returns fastest matching elementfind_index- Returns index of fastest matching element
Other predicates like all?, any?, none?, one?, and include? return boolean values, so the order doesn't affect the result.
When to Use Async
Async::Enumerable is beneficial when:
- Operations in the block are I/O bound (network requests, file operations)
- You have a large collection with expensive operations per element
- The order of results doesn't matter, or you're collecting all results
Async::Enumerable may not help (and could be slower) when:
- Operations are very fast/simple
- Order of execution matters (for find/find_index)
- Operations depend on previous results
- The overhead of concurrency exceeds the operation cost
Examples
Parallel API Requests
user_ids = [1, 2, 3, 4, 5]
# Fetch user data in parallel
users = user_ids.async.map do |id|
fetch_user_from_api(id)
end
Parallel File Processing
file_paths = Dir.glob("*.txt")
# Process files in parallel
results = file_paths.async.map do |path|
process_file(File.read(path))
end
Early Termination with any?
# Stops as soon as one valid item is found
has_valid = items.async.any? do |item|
expensive_validation(item)
end
Finding in Parallel
# Searches in parallel, stops when found
result = large_dataset.async.find do |record|
complex_matching_logic(record)
end
Benchmarks
The gem includes benchmarks that demonstrate the performance characteristics of async operations compared to synchronous ones.
Performance Results
When operations involve IO (simulated with sleep delays), async methods show significant performance improvements that scale with collection size:
Collection Size Comparison
| Collection Size | Sync (i/s) | Async (i/s) | Speedup |
|---|---|---|---|
| 10 items | 159.8 | 924.7 | 5.8x faster |
| 100 items | 15.8 | 325.2 | 20.6x faster |
| 1000 items | 7.8 | 44.7 | 5.8x faster |
Note: For 1000+ items, using max_fibers can provide additional optimization
Early Termination Performance
Even methods that can terminate early benefit from async execution:
| Method | Scenario | Sync (i/s) | Async (i/s) | Speedup |
|---|---|---|---|---|
any? |
Early match | 265.5 | 1190.9 | 4.5x faster |
any? |
Late match | 16.5 | 351.8 | 21.3x faster |
find |
Middle element | 31.8 | 412.5 | 13.0x faster |
Max Fibers Configuration
For very large collections, limiting concurrent fibers can improve performance:
# Default (1024 fibers max)
(1..10000).async.map { |n| process(n) }
# Limited to 100 concurrent fibers
(1..10000).async(max_fibers: 100).map { |n| process(n) }
# Configure global default
Async::Enumerable.configure { |c| c.max_fibers = 100 }
# Configure at class level
class MyClass
include Async::Enumerable
def_async_enumerable :@data, max_fibers: 50
end
Running Benchmarks
# Run detailed benchmarks with organized comparisons
bundle exec rake benchmark
# Run quick performance overview
bundle exec rake benchmark_quick
# Run specific benchmark files
bundle exec benchmark-driver benchmark/size_comparison/map_100.yaml
bundle exec benchmark-driver benchmark/early_termination/any_early.yaml
Benchmark Structure
The benchmarks are organized into two categories:
Size Comparison Benchmarks (benchmark/size_comparison/)
Compare sync vs async performance across different collection sizes (10, 100, 1000, 10000 items):
- map_*.yaml - Tests parallel transformation performance at different scales
Early Termination Benchmarks (benchmark/early_termination/)
Test methods that can stop processing early:
- any_early.yaml - Tests
any?with early match - any_late.yaml - Tests
any?with late match - find_middle.yaml - Tests
findwith middle element match
The benchmarks simulate IO operations using scaled sleep delays to demonstrate real-world performance benefits.
Writing Custom Benchmarks
You can create your own benchmarks using benchmark-driver's YAML format:
prelude: |
require 'async/enumerable'
def expensive_operation(n)
sleep(rand / 100.0) # Simulate 0-10ms IO
n * 2
end
data = (1..100).to_a
benchmark:
sync: data.map { |n| expensive_operation(n) }
async: data.async.map { |n| expensive_operation(n) }
Development
After checking out the repo, run bin/setup to install dependencies. Then, run rake spec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and the created tag, and push the .gem file to rubygems.org.
Contributing
Bug reports and pull requests are welcome on GitHub at https://github.com/jetpks/async-enumerable. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the code of conduct.
License
The gem is available as open source under the terms of the MIT License.
Code of Conduct
Everyone interacting in the Async::Enumerable project's codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.