Class: Litejob::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/litejob/server.rb

Overview

Litejob::Server is responsible for popping job payloads from the SQLite queue.

Instance Method Summary collapse

Constructor Details

#initialize(queues = ["default"]) ⇒ Server

Returns a new instance of Server.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/litejob/server.rb', line 10

def initialize(queues = ["default"])
  @queue = Litequeue.instance
  @scheduler = Litescheduler.instance
  @queues = queues
  # group and order queues according to their priority
  @prioritized_queues = queues.each_with_object({}) do |(name, priority, spawns), memo|
    priority ||= 5
    memo[priority] ||= []
    memo[priority] << [name, spawns == "spawn"]
  end.sort_by do |priority, _|
    -priority
  end
  @running = true
  @sleep_intervals = [0.001, 0.005, 0.025, 0.125, 0.625, 1.0, 2.0]
  run!
end

Instance Method Details

#pop(queue) ⇒ Object



27
28
29
30
31
32
33
34
# File 'lib/litejob/server.rb', line 27

def pop(queue)
  result = @queue.pop(queue: queue)

  return result unless result.is_a?(Array)
  return false if result.empty?

  result
end

#run!Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/litejob/server.rb', line 36

def run!
  @scheduler.spawn do
    Litejob.logger.info("[litejob]:[RUN] id=#{@scheduler.context.object_id}")
    worker_sleep_index = 0
    while @running
      processed = 0
      @prioritized_queues.each do |priority, queues|
        queues.each do |queue, spawns|
          batched = 0
          while (batched < priority) && (payload = pop(queue))
            batched += 1
            processed += 1

            id, serialized_job = payload
            processor = Processor.new(queue, id, serialized_job)
            processor.process!

            # give other contexts a chance to run here
            @scheduler.switch
          end
        end

        if processed == 0
          sleep @sleep_intervals[worker_sleep_index]
          worker_sleep_index += 1 if worker_sleep_index < @sleep_intervals.length - 1
        else
          worker_sleep_index = 0 # reset the index
        end
      end
    end
  end
end