Class: Procrastinator::QueueWorker
- Inherits:
-
Object
- Object
- Procrastinator::QueueWorker
- Extended by:
- Forwardable
- Defined in:
- lib/procrastinator/queue_worker.rb
Overview
A QueueWorker checks for tasks to run from the task store and executes them, updating information in the task store as necessary.
Constant Summary collapse
- PERSISTER_METHODS =
expected methods for all persistence strategies
[:read, :update, :delete].freeze
- NULL_FILE =
File.open(File::NULL, File::WRONLY)
Instance Method Summary collapse
-
#halt ⇒ Object
Logs halting the queue.
-
#initialize(queue:, config:) ⇒ QueueWorker
constructor
A new instance of QueueWorker.
-
#open_log!(name, config) ⇒ Object
Starts a log file and returns the created Logger.
-
#work! ⇒ Object
Works on jobs forever.
-
#work_one ⇒ Object
Performs exactly one task on the queue.
Constructor Details
#initialize(queue:, config:) ⇒ QueueWorker
Returns a new instance of QueueWorker.
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/procrastinator/queue_worker.rb', line 18 def initialize(queue:, config:) raise ArgumentError, ':queue cannot be nil' if queue.nil? raise ArgumentError, ':config cannot be nil' if config.nil? @config = config @queue = if queue.is_a? Symbol config.queue(name: queue) else queue end @scheduler = Scheduler.new(config) @logger = Logger.new(File::NULL) end |
Instance Method Details
#halt ⇒ Object
Logs halting the queue
70 71 72 73 |
# File 'lib/procrastinator/queue_worker.rb', line 70 def halt @logger&.info("Halted worker on queue: #{ name }") @logger&.close end |
#open_log!(name, config) ⇒ Object
Starts a log file and returns the created Logger
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
# File 'lib/procrastinator/queue_worker.rb', line 76 def open_log!(name, config) if config.log_level log_path = config.log_dir / "#{ name }.log" config.log_dir.mkpath FileUtils.touch(log_path) else log_path = NULL_FILE end Logger.new(log_path.to_path, config.log_shift_age, config.log_shift_size, level: config.log_level || Logger::FATAL, progname: name, formatter: Config::DEFAULT_LOG_FORMATTER) end |
#work! ⇒ Object
Works on jobs forever
35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/procrastinator/queue_worker.rb', line 35 def work! @logger = open_log!("#{ name }-queue-worker", @config) @logger.info "Started worker thread to consume queue: #{ name }" loop do sleep(@queue.update_period) work_one end rescue StandardError => e @logger.fatal(e) raise end |
#work_one ⇒ Object
Performs exactly one task on the queue
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/procrastinator/queue_worker.rb', line 51 def work_one task = next_task(logger: @logger, container: @config.container, scheduler: @scheduler) || return begin task.run @queue.delete(task.id) rescue StandardError => e task.fail(e) task_info = task.to_h id = task_info.delete(:id) @queue.update(id, **task_info) end end |