ConcurrentPipeline

Define a bunch of steps, run them concurrently (or not), recover your data from any step along the way. Rinse, repeat as needed.

Problem it solves

Occasionally I need to write a one-off script that has a number of independent steps, maybe does a bit of data aggregation, and reports some results at the end. In total, the script might take a long time to run. First, I'll write the full flow against a small set of data and get it working. Then I'll run it against the full dataset and find out 10 minutes later that I didn't handle a certain edge-case. So I rework the script and rerun ... and 10 minutes later learn that there was another edge-case I didn't handle.

The long feedback cycle here is painful and unnecessary. If I wrote all data to a file as it came in, when I encountered a failure, I could fix the handling in my code and resume where I left off, no longer needing to re-process all the steps that have already been completed successfully. This gem is my attempt to build a solution to that scenario.

Installation

Hey it's a gem, you know what to do. Ugh, fine I'll write it: Run gem install concurrent_pipeline to install or add it to your Gemfile!

Contributing

This code I've just written is already legacy code. Good luck!

License

WTFPL

Guide and Code Examples

The text above was written by a human. The text below was written by Monsieur Claude. Is it correct? Yeah, I guess probably, sure, let's go with "yep" ok?

Basic Example

Define a store with records, create a pipeline with processing steps, and run it:

require "concurrent_pipeline"

# Define your data store
store = ConcurrentPipeline::Store.define do
  storage(:yaml, dir: "/tmp/my_pipeline")

  record(:user) do
    attribute(:name)
    attribute(:processed, default: false)
  end
end

# Create some data
store.create(:user, name: "Alice")
store.create(:user, name: "Bob")

# Define processing pipeline
pipeline = ConcurrentPipeline::Pipeline.define do
  processor(:sync)  # Run sequentially

  process(:user, processed: false) do |user|
    puts "Processing #{user.name}"
    store.update(user, processed: true)
  end
end

# Run it
pipeline.process(store)

Async Processing

Use :async processor to run steps concurrently:

pipeline = ConcurrentPipeline::Pipeline.define do
  processor(:async)  # Run concurrently

  process(:user, processed: false) do |user|
    # Each user processed in parallel
    sleep 1
    store.update(user, processed: true)
  end
end

Control concurrency and polling with optional parameters:

pipeline = ConcurrentPipeline::Pipeline.define do
  # concurrency: max parallel tasks (default: 5)
  # enqueue_seconds: sleep between checking for new work (default: 0.1)
  processor(:async, concurrency: 10, enqueue_seconds: 0.5)

  process(:user, processed: false) do |user|
    # Up to 10 users processed concurrently
    expensive_api_call(user)
    store.update(user, processed: true)
  end
end

Custom Methods on Records

Records can have custom methods defined in the record block:

store = ConcurrentPipeline::Store.define do
  storage(:yaml, dir: "/tmp/my_pipeline")

  record(:user) do
    attribute(:first_name)
    attribute(:last_name)
    attribute(:age)

    def full_name
      "#{first_name} #{last_name}"
    end

    def adult?
      age >= 18
    end
  end
end

store.create(:user, first_name: "Alice", last_name: "Smith", age: 25)
user = store.all(:user).first
puts user.full_name  # => "Alice Smith"
puts user.adult?     # => true

Filtering Records

Use where to filter records, or pass filters directly to process:

# Manual filtering
pending_users = store.where(:user, processed: false, active: true)

# Filter with lambdas/procs for custom logic
even_ids = store.where(:user, id: ->(id) { id.to_i.even? })
adults = store.where(:user, age: ->(age) { age >= 18 })

# Combine regular values with lambda filters
active_adults = store.where(:user, active: true, age: ->(age) { age >= 18 })

# Or use filters in pipeline
pipeline = ConcurrentPipeline::Pipeline.define do
  processor(:sync)

  # Old style: pass a lambda
  process(-> { store.all(:user).select(&:active?) }) do |user|
    # ...
  end

  # New style: pass record name and filters
  process(:user, processed: false, active: true) do |user|
    # ...
  end
end

Error Handling

When errors occur during async processing, they're collected and the pipeline returns false:

pipeline = ConcurrentPipeline::Pipeline.define do
  processor(:async)

  process(:user, processed: false) do |user|
    raise "Something went wrong with #{user.name}" if user.name == "Bob"
    store.update(user, processed: true)
  end
end

result = pipeline.process(store)

unless result
  puts "Pipeline failed!"
  pipeline.errors.each { |error| puts error.message }
end

Recovering from Failures

The store automatically versions your data. If processing fails, fix your code and restore from where you left off:

# First run - fails partway through
store = ConcurrentPipeline::Store.define do
  storage(:yaml, dir: "/tmp/my_pipeline")

  record(:user) do
    attribute(:name)
    attribute(:email)
    attribute(:email_sent, default: false)
  end
end

5.times { |i| store.create(:user, name: "User#{i}") }

pipeline = ConcurrentPipeline::Pipeline.define do
  processor(:async)

  process(:user, email_sent: false) do |user|
    # Oops, forgot to handle missing emails
    email = fetch_email_for(user.name)  # Might return nil!
    send_email(email)  # This will fail if email is nil
    store.update(user, email: email, email_sent: true)
  end
end

pipeline.process(store)  # Some succeed, some fail

# Check what versions exist
store.versions.each_with_index do |version, i|
  puts "Version #{i}: #{version.all(:user).count { |u| u.email_sent }} emails sent"
end

# Fix the code and restore from last version
last_version = store.versions.first
restored_store = last_version.restore

# Now run with fixed logic
pipeline = ConcurrentPipeline::Pipeline.define do
  processor(:async)

  process(:user, email_sent: false) do |user|
    email = fetch_email_for(user.name) || "[email protected]"  # Fixed!
    send_email(email)
    restored_store.update(user, email: email, email_sent: true)
  end
end

pipeline.process(restored_store)  # Only processes remaining users

Storage Structure

When using YAML storage, data is stored in a simple, human-readable file structure:

/tmp/my_pipeline/
├── data.yml              # Current state (always up-to-date)
└── versions/
    ├── 0001.yml          # Historical version 1
    ├── 0002.yml          # Historical version 2
    └── 0003.yml          # Historical version 3
  • data.yml: Contains the most recent state of your data. You can inspect this file at any time to see the current state.
  • versions/: Contains snapshots of previous versions. Each file is a complete snapshot at that point in time.

When you restore to a previous version, that version is copied to data.yml and any versions after it are deleted. You can then continue working from that restored state.

Running Shell Commands

The Shell class helps run external commands within your pipeline. It exists because running shell commands in Ruby can be tedious - you need to capture stdout, stderr, check exit status, and handle failures. Shell simplifies this.

Available in process blocks via the shell helper:

pipeline = ConcurrentPipeline::Pipeline.define do
  processor(:sync)

  process(:repository, cloned: false) do |repo|
    # Shell.run returns a Result with stdout, stderr, success?, command
    result = shell.run("git clone #{repo.url} /tmp/#{repo.name}")

    if result.success?
      puts result.stdout
      store.update(repo, cloned: true)
    else
      puts "Failed: #{result.stderr}"
    end
  end
end

Use run! to raise on failure:

process(:repository, cloned: false) do |repo|
  # Raises error if command fails, returns stdout if success
  output = shell.run!("git clone #{repo.url} /tmp/#{repo.name}")
  store.update(repo, cloned: true, output: output)
end

Stream output in real-time with a block:

process(:project, built: false) do |project|
  shell.run("npm run build") do |stream, line|
    puts "[#{stream}] #{line}"
  end
  store.update(project, built: true)
end

Use outside of pipelines by calling directly:

# Check if a command succeeds
result = ConcurrentPipeline::Shell.run("which docker")
docker_installed = result.success?

# Get output or raise
version = ConcurrentPipeline::Shell.run!("ruby --version")
puts version  # => "ruby 3.2.9 ..."

Multiple Processing Steps

Chain multiple steps together - each step processes what the previous step created:

store = ConcurrentPipeline::Store.define do
  storage(:yaml, dir: "/tmp/my_pipeline")

  record(:company) do
    attribute(:name)
    attribute(:fetched, default: false)
  end

  record(:employee) do
    attribute(:company_name)
    attribute(:name)
    attribute(:processed, default: false)
  end
end

store.create(:company, name: "Acme Corp")
store.create(:company, name: "Tech Inc")

pipeline = ConcurrentPipeline::Pipeline.define do
  processor(:async)

  # Step 1: Fetch employees for each company
  process(:company, fetched: false) do |company|
    employees = api_fetch_employees(company.name)
    employees.each do |emp|
      store.create(:employee, company_name: company.name, name: emp)
    end
    store.update(company, fetched: true)
  end

  # Step 2: Process each employee
  process(:employee, processed: false) do |employee|
    send_welcome_email(employee.name)
    store.update(employee, processed: true)
  end
end

pipeline.process(store)

Final words

That's it, you've reached THE END OF THE INTERNET.