Conflow

Gem Version Build Status Maintainability Test Coverage

Conflow allows defining complicated workflows with dependencies. Inspired by Gush (the idea) and Redis::Objects (the implementation) it focuses solely on dependency logic, while leaving queueing jobs and executing them entirely in hands of the programmer.

Please have a look at Gush if you already use Rails and ActiveJob - it might suit your needs better.

Installation

Add this line to your application's Gemfile:

gem "conflow"

And then execute:

$ bundle

Or install it yourself as:

$ gem install conflow

Usage

Configuration

Redis connection

To configure Redis connection, set Conflow.redis attribute to a Redis or ConnectionPool instance.

Conflow.redis = Redis.new(host: "127.0.0.1", port: 6379)
# or
require "connection_pool"
Conflow.redis = ConnectionPool.new(size: 5, timeout: 5) { Redis.new(host: "127.0.0.1", port: 6379) }

Redis script caching

By default, gem caches it's scripts in Redis server. To disable this behaviour, set cache_scripts to false:

Conflow::Redis::Scripts.cache_scripts = false

Defining flows

In order to define a flow, first you need to supply a way to enqueue jobs.

Conflow does not make any assumptions about this process - you can enqueue Sidekiq job, send a RabbitMQ event or send an email to a Very Important Person with flow ID and job ID.

class ApplicationFlow < Conflow::Flow
  def queue(job)
    Sidekiq::Client.enqueue(FlowWorkerJob, id, job.id)
  end
end

id (Conflow::Flow#id) and job.id (Conflow::Job#id) is enough to identify job and execute it properly. Make sure that you send both of these values and it will be OK.

You can define actual jobs to be performed using #configure method:

class MyFlow < ApplicationFlow
  def configure(id:, strict:)
    run UpsertJob, params: { id: id }
    run CheckerJob, params: { id: id }, after: UpsertJob if strict
  end
end

To create flow, use .create method:

MyFlow.create(id: 320, strict: false)
MyFlow.create(id: 15, strict: true)

Dependencies

You can use after option to define dependencies. after accepts a Class, Conflow::Job instance or Integer with id of the job - or an array with any combination of these.

class MyFlow < ApplicationFlow
  def configure
    first = run FirstJob
    independent = run IndependentJob

    run SecondJob, after: [FirstJob, independent]
    run FinishUp, after: SecondJob
  end
end

Created graph

Promises

In order to use other Job's result as parameter of another job, use Futures:

class MyFlow < ApplicationFlow
  def configure
    first = run FirstJob
    run SecondJob, params: { object_id: first.outcome[:id] }
  end
end

Note that SecondJob will automatically depend on FirstJob. When FirstJob finishes, it is expected to return hash: { id: "<some object>" }.

Returned object must be serializable with JSON in order to be properly persisted and handled by Redis script which resolves promises.

If FirstJob returns { id: 14 }, SecondJob will be run with { object_id: 14 } parameter.

Performing jobs

To perform job, use Conflow::Worker mixin. It adds #perform method, which accepts two arguments: IDs of the flow and the job.

Simple Conflow::Worker that is also Sidekiq::Worker:

class FlowWorkerJob
  include Conflow::Worker
  include Sidekiq::Worker # order is important!

  def perform(flow_id, job_id)
    super do |worker_class, params|
      worker_class.new(params).call
    end
  end
end

For previously defined flow, executing this flow would result in:

FirstJob.new({}).call
IndependentJob.new({}).call # order of the first two is not defined

SecondJob.new({}).call

FinishUp.new({}).call

Theory

The main idea of the gem is, obviously, a directed graph containing information about dependencies. It is stored in Redis in following fields:

  • conflow:job:<id>:successors - (List) containing IDs of jobs which depend on <id>
  • conflow:flow:<id>:indegee - (Sorted Set) set of all unqueued jobs with score representing how many dependencies are not yet fulfilled

There are three main actions that can be performed on this graph (Redis-wise):

  1. Queue jobs Removes all jobs with score 0 from :indegree set
  2. Complete job Decrement scores of all of the job's successors by one
  3. Add job Add job ID to :successors list for all jobs on which it depends and add job itself to :indegree set

All of these actions are performed via eval/evalsha - it lifts problems with synchronization (as scripts are executed as if in transaction) and significantly reduces amount of requests made to Redis.

Development

After checking out the repo, run bin/setup to install dependencies. Then, run rake spec to run the tests. You can also run bin/console for an interactive prompt that will allow you to experiment.

To install this gem onto your local machine, run bundle exec rake install. To release a new version, update the version number in version.rb, and then run bundle exec rake release, which will create a git tag for the version, push git commits and tags, and push the .gem file to rubygems.org.

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/[USERNAME]/conflow. This project is intended to be a safe, welcoming space for collaboration, and contributors are expected to adhere to the Contributor Covenant code of conduct.

License

The gem is available as open source under the terms of the MIT License.

Code of Conduct

Everyone interacting in the Conflow project’s codebases, issue trackers, chat rooms and mailing lists is expected to follow the code of conduct.