Class: Litejob::Server
- Inherits:
-
Object
- Object
- Litejob::Server
- Defined in:
- lib/litejob/server.rb
Overview
Litejob::Server is responsible for popping job payloads from the SQLite queue.
Instance Method Summary collapse
-
#initialize(queues = ["default"]) ⇒ Server
constructor
A new instance of Server.
- #pop(queue) ⇒ Object
- #run! ⇒ Object
Constructor Details
#initialize(queues = ["default"]) ⇒ Server
Returns a new instance of Server.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# File 'lib/litejob/server.rb', line 10 def initialize(queues = ["default"]) @queue = Litequeue.instance @scheduler = Litescheduler.instance @queues = queues # group and order queues according to their priority @prioritized_queues = queues.each_with_object({}) do |(name, priority, spawns), memo| priority ||= 5 memo[priority] ||= [] memo[priority] << [name, spawns == "spawn"] end.sort_by do |priority, _| -priority end @running = true @sleep_intervals = [0.001, 0.005, 0.025, 0.125, 0.625, 1.0, 2.0] run! end |
Instance Method Details
#pop(queue) ⇒ Object
27 28 29 30 31 32 33 34 |
# File 'lib/litejob/server.rb', line 27 def pop(queue) result = @queue.pop(queue: queue) return result unless result.is_a?(Array) return false if result.empty? result end |
#run! ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/litejob/server.rb', line 36 def run! @scheduler.spawn do Litejob.logger.info("[litejob]:[RUN] id=#{@scheduler.context.object_id}") worker_sleep_index = 0 while @running processed = 0 @prioritized_queues.each do |priority, queues| queues.each do |queue, spawns| batched = 0 while (batched < priority) && (payload = pop(queue)) batched += 1 processed += 1 id, serialized_job = payload processor = Processor.new(queue, id, serialized_job) processor.process! # give other contexts a chance to run here @scheduler.switch end end if processed == 0 sleep @sleep_intervals[worker_sleep_index] worker_sleep_index += 1 if worker_sleep_index < @sleep_intervals.length - 1 else worker_sleep_index = 0 # reset the index end end end end end |