Class: Procrastinator::QueueWorker

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/procrastinator/queue_worker.rb

Overview

A QueueWorker checks for tasks to run from the task store and executes them, updating information in the task store as necessary.

Author:

  • Robin Miller

Constant Summary collapse

PERSISTER_METHODS =

expected methods for all persistence strategies

[:read, :update, :delete].freeze
NULL_FILE =
File.open(File::NULL, File::WRONLY)

Instance Method Summary collapse

Constructor Details

#initialize(queue:, config:) ⇒ QueueWorker

Returns a new instance of QueueWorker.

Raises:

  • (ArgumentError)


18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/procrastinator/queue_worker.rb', line 18

def initialize(queue:, config:)
   raise ArgumentError, ':queue cannot be nil' if queue.nil?
   raise ArgumentError, ':config cannot be nil' if config.nil?

   @config = config

   @queue = if queue.is_a? Symbol
               config.queue(name: queue)
            else
               queue
            end

   @scheduler = Scheduler.new(config)
   @logger    = Logger.new(File::NULL)
end

Instance Method Details

#haltObject

Logs halting the queue



70
71
72
73
# File 'lib/procrastinator/queue_worker.rb', line 70

def halt
   @logger&.info("Halted worker on queue: #{ name }")
   @logger&.close
end

#open_log!(name, config) ⇒ Object

Starts a log file and returns the created Logger



76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/procrastinator/queue_worker.rb', line 76

def open_log!(name, config)
   if config.log_level
      log_path = config.log_dir / "#{ name }.log"

      config.log_dir.mkpath
      FileUtils.touch(log_path)
   else
      log_path = NULL_FILE
   end

   Logger.new(log_path.to_path,
              config.log_shift_age, config.log_shift_size,
              level:     config.log_level || Logger::FATAL,
              progname:  name,
              formatter: Config::DEFAULT_LOG_FORMATTER)
end

#work!Object

Works on jobs forever



35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/procrastinator/queue_worker.rb', line 35

def work!
   @logger = open_log!("#{ name }-queue-worker", @config)
   @logger.info "Started worker thread to consume queue: #{ name }"

   loop do
      sleep(@queue.update_period)

      work_one
   end
rescue StandardError => e
   @logger.fatal(e)

   raise
end

#work_oneObject

Performs exactly one task on the queue



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/procrastinator/queue_worker.rb', line 51

def work_one
   task = next_task(logger:    @logger,
                    container: @config.container,
                    scheduler: @scheduler) || return

   begin
      task.run

      @queue.delete(task.id)
   rescue StandardError => e
      task.fail(e)

      task_info = task.to_h
      id        = task_info.delete(:id)
      @queue.update(id, **task_info)
   end
end