queue-processor

Queue Processor is a calculation model built on top of Delayed Job designed to process a single root calculation followed by a number of dependent calculations.

Installation

Add queue-processor and state_machine to your Gemfile. Note that a specific version of state_machine is required until https://github.com/pluginaweek/state_machine/pull/255 is resolved.

gem 'queue-processor', :git => 'https://github.com/GoodMeasuresLLC/queue-processor.git'
gem 'state_machine', :git => 'https://github.com/GoodMeasuresLLC/state_machine.git'

Run bundle install

Requirements

You must have 3 models:

  1. The RootCalculation: This is the calculation that everything depends on. Performing a new RootCalculation will cause everything in progress for the old calculation to be aborted. For example, a Weekly Calculation that determine's the user's score for the week against their goals.
  2. A DependentCalculationGroup: These dependent calculations are queued and processed after the RootCalculation is performed. DependentCalculationGroups simply queue additional work, partitioned and prioritized. For example, a DailyCalculation could queue HourlyCalculations prioritized such that tomorrow's calculations are prioritized first. When all the HourlyCalculations complete, they are made available as a group though the DailyCalculation. When the last DailyCalculation completes, the RootCalculation is marked as finished.
  3. A DependentCalculation: A calculation that depends on the Root Calculation being performed. DependentCalculations are made available in groups and when all are finished, the RootCalculation is marked as complete. For example, an HourlyCalculation could be performed for a particular hour and day of the week. The would be grouped by day of the week and when all the hourly calculations for a day are complete, that week's dependent calculation group would be marked as complete. And when all the hours for the whole week are done, the Root Calculation would be marked as complete.

Your models implement methods for performing the calculations and queuing dependent work

Examples

class WeeklyCalculation < ActiveRecord::Base
  include QueueProcessor::RootCalculation
  root_calculation_config do
    self.dependent_calculation_group_association = :daily_calculations
    self.run_at = lambda {|run| Time.now + 0.005.seconds}
    self.priority = lambda {|run| 0}
  end

  has_many :daily_calculations, :dependent => :destroy   

  # Required hook to perform the calculation associated with the Root Calculation
  def perform_calculation
    update(:computed_value => rand())
  end

  # Required hook to create and queue the dependent work
  def create_and_enqueue_dependent_calculations
    (0...7).map do |n|
      self.daily_calculations.create!(:day => n)
    end.each(&:add_to_queue)
  end
class DailyCalculation < ActiveRecord::Base
  include QueueProcessor::DependentCalculationGroup
  dependent_calculation_group_config do
    self.dependent_calculation_association = :hourly_calculations
    self.parent_calculation = :weekly_calculation
    self.priority = lambda {|daily_calculation| 1}
  end

  belongs_to :weekly_calculation
  has_many :hourly_calculations, :dependent => :destroy

  # Required hook to create and queue dependent work, partitioned to this group
  def create_and_enqueue_dependent_calculations
    (0...23).map do |n|
      self.hourly_calculations.create!(:hour => n)
    end.each(&:add_to_queue)
  end

  # Call back invoked when a calculation finished and any old calculations for this day should be cleaned up
  def cleanup_old_calculations
    begin
      self.with_lock do
        query = DailyCalculation.where(:weekly_calculation_id => self.parent_calculation.id, :day => self.day).where("id < ?", self.id)
        query.destroy_all
      end
    rescue ActiveRecord::RecordNotFound => e
    end
  end
end
class HourlyCalculation < ActiveRecord::Base
  include QueueProcessor::DependentCalculation
  dependent_calculation_config do
    self.parent_calculation = :daily_calculation
    self.priority = lambda {|daily_calculation| 2}
  end

  belongs_to :daily_calculation

  # Required hook that computes the hourly value
  def perform_calculation
    update(:computed_value => rand())
  end
end