Class: Litejobqueue

Inherits:
Litequeue show all
Includes:
Litemetric::Measurable
Defined in:
lib/litestack/litejobqueue.rb

Overview

Litejobqueue is a job queueing and processing system designed for Ruby applications. It is built on top of SQLite, which is an embedded relational database management system that is #lightweight and fast.

One of the main benefits of Litejobqueue is that it is very low on resources, making it an ideal choice for applications that need to manage a large number of jobs without incurring #high resource costs. In addition, because it is built on SQLite, it is easy to use and does not require any additional configuration or setup.

Litejobqueue also integrates well with various I/O frameworks like Async and Polyphony, making it a great choice for Ruby applications that use these frameworks. It provides a #simple and easy-to-use API for adding jobs to the queue and for processing them.

Overall, LiteJobQueue is an excellent choice for Ruby applications that require a lightweight, embedded job queueing and processing system that is fast, efficient, and easy to use.

Constant Summary collapse

DEFAULT_OPTIONS =

the default options for the job queue can be overridden by passing new options in a hash to Litejobqueue.new, it will also be then passed to the underlying Litequeue object

config_path: "./litejob.yml" -> were to find the configuration file (if any)
path: "./db/queue.db"
mmap_size: 128 * 1024 * 1024 -> 128MB to be held in memory
sync: 1 -> sync only when checkpointing
queues: [["default", 1, "spawn"]] -> an array of queues to process
workers: 1 -> number of job processing workers
sleep_intervals: [0.001, 0.005, 0.025, 0.125, 0.625, 3.125] -> sleep intervals for workers

queues will be processed according to priority, such that if the queues are as such

queues: [["default", 1, "spawn"], ["urgent", 10]]

it means that roughly, if the queues are full, for each 10 urgent jobs, 1 default job will be processed the priority value is mandatory. The optional “spawn” parameter tells the job workers to spawn a separate execution context (thread or fiber, based on environment) for each job. This can be particularly useful for long running, IO bound jobs. It is not recommended though for threaded environments, as it can result in creating many threads that may consudme a lot of memory.

{
  config_path: "./litejob.yml",
  path: Litesupport.root.join("queue.sqlite3"),
  queues: [["default", 1]],
  workers: 5,
  retries: 5,
  retry_delay: 60,
  retry_delay_multiplier: 10,
  dead_job_retention: 10 * 24 * 3600,
  gc_sleep_interval: 7200,
  logger: "STDOUT",
  sleep_intervals: [0.001, 0.005, 0.025, 0.125, 0.625, 1.0, 2.0],
  metrics: false
}
@@queue =
nil

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Litemetric::Measurable

#capture, #capture_snapshot, #collect_metrics, #create_snapshotter, #measure

Methods inherited from Litequeue

#clear, #count, #find, #pop, #queues_info, #snapshot

Methods included from Litesupport::Liteconnection

#close, #journal_mode, #options, #path, #size, #synchronous

Methods included from Litesupport::Forkable

#_fork

Constructor Details

#initialize(options = {}) ⇒ Litejobqueue

create new queue instance (only once instance will be created in the process)

jobqueue = Litejobqueue.new


69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/litestack/litejobqueue.rb', line 69

def initialize(options = {})
  @queues = [] # a place holder to allow workers to process
  super(options)

  # group and order queues according to their priority
  pgroups = {}
  @options[:queues].each do |q|
    pgroups[q[1]] = [] unless pgroups[q[1]]
    pgroups[q[1]] << [q[0], q[2] == "spawn"]
  end
  @queues = pgroups.keys.sort.reverse.collect { |p| [p, pgroups[p]] }
  collect_metrics if @options[:metrics]
end

Instance Attribute Details

#runningObject (readonly)

Returns the value of attribute running.



49
50
51
# File 'lib/litestack/litejobqueue.rb', line 49

def running
  @running
end

Class Method Details

.jobqueue(options = {}) ⇒ Object

a method that returns a single instance of the job queue for use by Litejob



55
56
57
# File 'lib/litestack/litejobqueue.rb', line 55

def self.jobqueue(options = {})
  @@queue ||= Litescheduler.synchronize { new(options) }
end

.new(options = {}) ⇒ Object



59
60
61
62
63
64
# File 'lib/litestack/litejobqueue.rb', line 59

def self.new(options = {})
  return @@queue if @@queue
  @@queue = allocate
  @@queue.send(:initialize, options)
  @@queue
end

Instance Method Details

#_pushObject



51
# File 'lib/litestack/litejobqueue.rb', line 51

alias_method :_push, :push

#delete(id) ⇒ Object

delete a job from the job queue

class EasyJob
   def perform(any, number, of_params)
      # do anything
   end
end
jobqueue = Litejobqueue.new
id = jobqueue.push(EasyJob, params, 10) # queue for processing in 10 seconds
jobqueue.delete(id)


119
120
121
122
123
124
# File 'lib/litestack/litejobqueue.rb', line 119

def delete(id)
  job = super(id)
  @logger.info("[litejob]:[DEL] job: #{job}")
  job = Oj.load(job[0], symbol_keys: true) if job
  job
end

#metrics_identifierObject



83
84
85
# File 'lib/litestack/litejobqueue.rb', line 83

def metrics_identifier
  "Litejob" # overrides default identifier
end

#push(jobclass, params, delay = 0, queue = nil) ⇒ Object

push a job to the queue

class EasyJob
   def perform(any, number, of_params)
      # do anything
   end
end
jobqueue = Litejobqueue.new
jobqueue.push(EasyJob, params) # the job will be performed asynchronously


95
96
97
98
99
100
101
# File 'lib/litestack/litejobqueue.rb', line 95

def push(jobclass, params, delay = 0, queue = nil)
  payload = Oj.dump({klass: jobclass, params: params, retries: @options[:retries], queue: queue}, mode: :strict)
  res = super(payload, delay, queue)
  capture(:enqueue, queue)
  @logger.info("[litejob]:[ENQ] queue:#{res[1]} class:#{jobclass} job:#{res[0]}")
  res
end

#repush(id, job, delay = 0, queue = nil) ⇒ Object



103
104
105
106
107
108
# File 'lib/litestack/litejobqueue.rb', line 103

def repush(id, job, delay = 0, queue = nil)
  res = super(id, Oj.dump(job, mode: :strict), delay, queue)
  capture(:enqueue, queue)
  @logger.info("[litejob]:[ENQ] queue:#{res[0]} class:#{job[:klass]} job:#{id}")
  res
end

#stopObject

stop the queue object (does not delete the jobs in the queue) specifically useful for testing



134
135
136
137
138
# File 'lib/litestack/litejobqueue.rb', line 134

def stop
  @running = false
  # @@queue = nil
  close
end