Disc Build Status

Disc fills the gap between your Ruby service objects and antirez's wonderful Disque backend.

Disc Wars!

Basic Usage

  1. Install the gem
  $ gem install disc
  1. Write your jobs
  require 'disc'

  class CreateGameGrid
    include Disc::Job
    disc queue: 'urgent'

    def perform(type)
      # perform rather lengthy operations here.
    end
  end
  1. Enqueue them to perform them asynchronously
  CreateGameGrid.enqueue('light_cycle')
  1. Create a file that requires anything needed for your jobs to run
# disc_init.rb
  require 'disc/worker'
  Dir['./jobs/**/*.rb'].each { |job| require job }
  1. Run as many Disc Worker processes as you wish, requiring your disc_init.rb file
  $ QUEUES=urgent,default disc -r ./disc_init.rb
  1. Or enqueue them to be performed at some time in the future, or on a queue other than it's default.
  CreateGameGrid.enqueue(
    'disc_arena',
    at: DateTime.new(2020, 12, 31),
    queue: 'not_so_important'
  )

Disc Jobs

Disc::Job is a module you can include in your Ruby classes, this allows a Disc worker process to execute the code in them by adding a class method (#enqueue) with the following signature:

def enqueue(arguments, at: nil, queue: nil, **options)
end

Signature documentation follows:

## Disc's `#enqueue` is the main user-facing method of a Disc job, it
#  enqueues a job with a given set of arguments in Disque, so it can be
#  picked up by a Disc worker process.
#
## Parameters:
#
## `arguments`  - an optional array of arguments with which to execute
#                 the job's #perform method.
#
# `at`          - an optional named parameter specifying a moment in the
#                 future in which to run the job, must respond to
#                 `#to_time`.
#
## `queue`      - an optional named parameter specifying the name of the
#                 queue in which to store the job, defaults to the class
#                 Disc queue or to 'default' if no Disc queue is specified
#                 in the class.
#
##  `**options` - an optional hash of options to forward internally to
#                 [disque-rb](https://github.com/soveran/disque-rb)'s
#                 `#push` method, valid options are:
#
##  `replicate: <count>`  - specifies the number of nodes the job should
#                           be replicated to.
#
### `delay: <sec>`        - specifies a delay time in seconds for the job
#                           to be delivered to a Disc worker, it is ignored
#                           if using the `at` parameter.
#
### `ttl: <sec>`          - specifies the job's time to live in seconds:
#                           after this time, the job is deleted even if
#                           it was not successfully delivered. If not
#                           specified, the default TTL is one day.
#
### `maxlen: <count>`     - specifies that if there are already <count>
#                           messages queued for the specified queue name,
#                           the message is refused.
#
### `async: true`         - asks the server to let the command return ASAP
#                           and replicate the job to other nodes in the background.
#
#
### CAVEATS
#
## For convenience, any object can be passed as the `arguments` parameter,
#  `Array.wrap` will be used internally to preserve the array structure.
#
## The `arguments` parameter is serialized for storage using `Disc.serialize`
#  and Disc workers picking it up use `Disc.deserialize` on it, both methods
#  use standard library json but can be overriden by the user
#

You can see Disque's ADDJOB documentation for more details

When a Disc worker process is assigned a job, it will create a new intance of the job's class and execute the #perform method with whatever arguments were previously passed to #enqueue.

Example:

class ComplexJob
  include Disc::Job
  disc queue: 'urgent'

  def perform(first_parameter, second_parameter)
    # do things...
  end
end


ComplexJob.enqueue(['first argument', { second: 'argument' }])

Settings

Disc takes its configuration from environment variables.

ENV Variable Default Value Description
QUEUES 'default' The list of queues that Disc::Worker will listen to, it can be a single queue name or a list of comma-separated queues
DISC_CONCURRENCY '25' Amount of threads to spawn when Celluloid is available.
DISQUE_NODES 'localhost:7711' This is the list of Disque servers to connect to, it can be a single node or a list of comma-separated nodes
DISQUE_AUTH '' Authorization credentials for Disque.
DISQUE_TIMEOUT '100' Time in milliseconds that the client will wait for the Disque server to acknowledge and replicate a job
DISQUE_CYCLE '1000' The client keeps track of which nodes are providing more jobs, after the amount of operations specified in cycle it tries to connect to the preferred node.

Error handling

When a job raises an exception, Disc.on_error is invoked with the error and the job data. By default, this method prints the error to standard error, but you can override it to report the error to your favorite error aggregator.

# On disc_init.rb
def Disc.on_error(exception, job)
  # ... report the error
end

Dir["./jobs/**/*.rb"].each { |job| require job }

Job Definition

The error handler function gets the data of the current job as a Hash, that has the following schema.

'class' (String) The Job class.
'arguments' (Array) The arguments passed to perform.
'queue' (String) The queue from which this job was picked up.
'id' (String) Disque's job ID.

[Optional] Celluloid integration

Disc workers run just fine on their own, but if you happen to be using Celluloid you might want Disc to take advantage of it and spawn multiple worker threads per process, doing this is trivial! Just require Celluloid before your init file:

$ QUEUES=urgent,default disc -r celluloid/current -r ./disc_init.rb

Whenever Disc detects that Celluloid is available it will use it to spawn a number of threads equal to the DISC_CONCURRENCY environment variable, or 25 by default.

[Optional] Rails and ActiveJob integration

You can use Disc easily in Rails without any more hassle, but if you'd like to use it via ActiveJob you can use the adapter included in this gem.

# Gemfile
gem 'disc'

# config/application.rb
module YourApp
  class Application < Rails::Application
    require 'active_job/queue_adapters/disc_adapter'
    config.active_job.queue_adapter = :disc
  end
end

# app/jobs/clu_job.rb

class CluJob < ActiveJob::Base
  queue_as :urgent

  def perform(*args)
    # Try to take over The Grid here...
  end
end

# disc_init.rb
require ::File.expand_path('../config/environment', __FILE__)

# Wherever you want
CluJob.perform_later(a_bunch_of_arguments)

Disc is run in the exact same way, for this example it'd be:

$ QUEUES=urgent disc -r ./disc_init.rb

A note on stability

The version of Disque at the time of this writing is 0.0.1. It is a wonderful project, I know of people running it in production and I expect it will only get better and better with time, but please do not expect Disc to be more production ready than Disque is.

Similar Projects

If you want to use Disque but Disc isn't cutting it for you then you should take a look at Havanna, a project by my friend @djanowski.

License

The code is released under an MIT license. See the LICENSE file for more information.

Acknowledgements

  • To @foca for helping me ship a quality thing and putting up with my constant whining.
  • To @antirez for Redis, Disque, and his refreshing way of programming wonderful tools.
  • To @soveran for pushing me to work on this and publishing gems that keep me enjoying ruby.
  • To all contributors

Sponsorship

This open source tool is proudly sponsored by 13Floor

13Floor