Class: Procrastinator::QueueWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/procrastinator/queue_worker.rb

Constant Summary collapse

DEFAULT_TIMEOUT =

in seconds; one hour total

3600
DEFAULT_MAX_ATTEMPTS =
20
DEFAULT_UPDATE_PERIOD =

seconds

10
DEFAULT_MAX_TASKS =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, persister:, log_dir: nil, log_level: Logger::INFO, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) ⇒ QueueWorker

Timeout is in seconds

Raises:

  • (ArgumentError)


11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/procrastinator/queue_worker.rb', line 11

def initialize(name:,
               persister:,
               log_dir: nil,
               log_level: Logger::INFO,
               max_attempts: DEFAULT_MAX_ATTEMPTS,
               timeout: DEFAULT_TIMEOUT,
               update_period: DEFAULT_UPDATE_PERIOD,
               max_tasks: DEFAULT_MAX_TASKS)
   raise ArgumentError.new('Queue name may not be nil') unless name
   raise ArgumentError.new('Persister may not be nil') unless persister

   raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #read_tasks')) unless persister.respond_to? :read_tasks
   raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #update_task')) unless persister.respond_to? :update_task
   raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #delete_task')) unless persister.respond_to? :delete_task

   @name          = name.to_s.gsub(/\s/, '_').to_sym
   @timeout       = timeout
   @max_attempts  = max_attempts
   @update_period = update_period
   @max_tasks     = max_tasks
   @persister     = persister
   @log_dir       = log_dir
   @log_level     = log_level

   start_log
end

Instance Attribute Details

#max_attemptsObject (readonly)

Returns the value of attribute max_attempts.



8
9
10
# File 'lib/procrastinator/queue_worker.rb', line 8

def max_attempts
  @max_attempts
end

#max_tasksObject (readonly)

Returns the value of attribute max_tasks.



8
9
10
# File 'lib/procrastinator/queue_worker.rb', line 8

def max_tasks
  @max_tasks
end

#nameObject (readonly)

Returns the value of attribute name.



8
9
10
# File 'lib/procrastinator/queue_worker.rb', line 8

def name
  @name
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



8
9
10
# File 'lib/procrastinator/queue_worker.rb', line 8

def timeout
  @timeout
end

#update_periodObject (readonly)

Returns the value of attribute update_period.



8
9
10
# File 'lib/procrastinator/queue_worker.rb', line 8

def update_period
  @update_period
end

Instance Method Details

#actObject



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/procrastinator/queue_worker.rb', line 51

def act
   # shuffling and re-sorting to avoid worst case O(n^2) on quicksort (which is default ruby sort)
   # when receiving already sorted data. Ideally, we'd use a better algo, but this will do for now
   tasks = @persister.read_tasks(@name).reject { |t| t[:run_at].nil? }.shuffle.sort_by { |t| t[:run_at] }

   tasks.first(@max_tasks).each do |task_data|
      if Time.now.to_i >= task_data[:run_at].to_i
         task_data.merge!(logger: @logger) if @logger

         tw = TaskWorker.new(task_data)

         tw.work

         if tw.successful?
            @persister.delete_task(task_data[:id])
         else
            @persister.update_task(tw.to_hash.merge(queue: @name))
         end
      end
   end
end

#log_parent_exitObject

Raises:

  • (RuntimeError)


101
102
103
104
105
# File 'lib/procrastinator/queue_worker.rb', line 101

def log_parent_exit
   raise RuntimeError.new('Cannot log when logger not defined. Call #start_log first.') unless @logger

   @logger.error("Terminated worker process (#{Process.pid}) due to main process (#{Process.ppid}) disappearing.")
end

#long_nameObject



73
74
75
# File 'lib/procrastinator/queue_worker.rb', line 73

def long_name
   "#{@name}-queue-worker"
end

#start_logObject

Starts a log file and stores the logger within this queue worker.

Separate from init because logging is context-dependent



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/procrastinator/queue_worker.rb', line 80

def start_log
   if @log_dir
      log_path = Pathname.new("#{@log_dir}/#{long_name}.log")

      log_path.dirname.mkpath
      File.open(log_path.to_path, 'a+') do |f|
         f.write ''
      end

      @logger = Logger.new(log_path.to_path)

      @logger.level = @log_level

      @logger.info(['',
                    '===================================',
                    "Started worker process, #{long_name}, to work off queue #{@name}.",
                    "Worker pid=#{Process.pid}; parent pid=#{Process.ppid}.",
                    '==================================='].join("\n"))
   end
end

#workObject



38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/procrastinator/queue_worker.rb', line 38

def work
   begin
      loop do
         sleep(@update_period)

         act
      end
   rescue StandardError => e
      @logger.fatal(e)
         # raise e
   end
end