Class: Litequeue

Inherits:
Object
  • Object
show all
Includes:
Litesupport::Liteconnection
Defined in:
lib/litestack/litequeue.rb

Overview

Litequeue is a simple queueing system for Ruby applications that allows you to push and pop values from a queue. It provides a straightforward API for creating and managing named queues, and for adding and removing values from those queues. Additionally, it offers options for scheduling pops at a certain time in the future, which can be useful for delaying processing until a later time.

Litequeue is built on top of SQLite, which makes it very fast and efficient, even when handling large volumes of data. This lightweight and easy-to-use queueing system serves as a good foundation for building more advanced job processing frameworks that require basic queuing capabilities.

Direct Known Subclasses

Litejobqueue

Constant Summary collapse

DEFAULT_OPTIONS =
{
  path: Litesupport.root.join("queue.sqlite3"),
  mmap_size: 32 * 1024,
  sync: 0
}

Instance Method Summary collapse

Methods included from Litesupport::Liteconnection

#close, #journal_mode, #options, #path, #size, #synchronous

Methods included from Litesupport::Forkable

#_fork

Constructor Details

#initialize(options = {}) ⇒ Litequeue

create a new instance of the litequeue object accepts an optional options hash which will be merged with the DEFAULT_OPTIONS

queue = Litequeue.new
queue.push("somevalue", 2) # the value will be ready to pop in 2 seconds
queue.pop # => nil
sleep 2
queue.pop # => "somevalue"


38
39
40
# File 'lib/litestack/litequeue.rb', line 38

def initialize(options = {})
  init(options)
end

Instance Method Details

#clear(queue = nil) ⇒ Object

deletes all the entries in all queues, or if a queue name is given, deletes all entries in that specific queue



79
80
81
# File 'lib/litestack/litequeue.rb', line 79

def clear(queue = nil)
  run_sql("DELETE FROM queue WHERE iif(?1 IS NOT NULL, name = ?1,  TRUE)", queue)
end

#count(queue = nil) ⇒ Object

returns a count of entries in all queues, or if a queue name is given, returns the count of entries in that queue



84
85
86
# File 'lib/litestack/litequeue.rb', line 84

def count(queue = nil)
  run_sql("SELECT count(*) FROM queue WHERE iif(?1 IS NOT NULL, name = ?1, TRUE)", queue)[0][0]
end

#delete(id) ⇒ Object

delete an item from the queue

queue = Litequeue.new
id = queue.push("somevalue")
queue.delete(id) # => "somevalue"
queue.pop # => nil


74
75
76
# File 'lib/litestack/litequeue.rb', line 74

def delete(id)
  run_stmt(:delete, id)[0]
end

#find(opts = {}) ⇒ Object



115
116
117
# File 'lib/litestack/litequeue.rb', line 115

def find(opts = {})
  run_stmt(:search, prepare_search_options(opts))
end

#pop(queue = "default", limit = 1) ⇒ Object

pop an item from the queue, optionally with a specific queue name (default queue name is ‘default’)



62
63
64
65
66
67
# File 'lib/litestack/litequeue.rb', line 62

def pop(queue = "default", limit = 1)
  res = run_stmt(:pop, queue, limit)
  return res[0] if res.length == 1
  return nil if res.empty?
  res
end

#push(value, delay = 0, queue = "default") ⇒ Object Also known as: <<

push an item to the queue, optionally specifying the queue name (defaults to default) and after how many seconds it should be ready to pop (defaults to zero) a unique job id is returned from this method, can be used later to delete it before it fires. You can push string, integer, float, true, false or nil values



45
46
47
48
49
50
51
52
# File 'lib/litestack/litequeue.rb', line 45

def push(value, delay = 0, queue = "default")
  # @todo - check if queue is busy, back off if it is
  # also bring back the synchronize block, to prevent
  # a race condition if a thread hits the busy handler
  # before the current thread proceeds after a backoff
  # id = SecureRandom.uuid # this is somehow expensive, can we improve?
  run_stmt(:push, queue, delay, value)[0]
end

#queues_infoObject

return the size of the queue file on disk def size

run_sql("SELECT size.page_size * count.page_count FROM pragma_page_size() AS size, pragma_page_count() AS count")[0][0]

end



93
94
95
# File 'lib/litestack/litequeue.rb', line 93

def queues_info
  run_stmt(:info)
end

#repush(id, value, delay = 0, queue = "default") ⇒ Object Also known as: <<<



54
55
56
# File 'lib/litestack/litequeue.rb', line 54

def repush(id, value, delay = 0, queue = "default")
  run_stmt(:repush, id, queue, delay, value)[0]
end

#snapshotObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/litestack/litequeue.rb', line 97

def snapshot
  queues = {}
  queues_info.each do |qc|
    # queues[qc[0]] = {count: qc[1], time_in_queue: {avg: qc[2], min: qc[3], max: qc[4]}}
    queues[qc[0]] = qc[1]
  end
  {
    summary: {
      path: path,
      journal_mode: journal_mode,
      synchronous: synchronous,
      size: size,
      jobs: count
    },
    queues: queues
  }
end