Class: Procrastinator::QueueWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/procrastinator/queue_worker.rb

Constant Summary collapse

DEFAULT_TIMEOUT =

seconds = one hour

3600
DEFAULT_MAX_ATTEMPTS =
20
DEFAULT_UPDATE_PERIOD =

seconds

10
DEFAULT_MAX_TASKS =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(name:, persister:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) ⇒ QueueWorker

Timeout is in seconds

Raises:

  • (ArgumentError)


11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/procrastinator/queue_worker.rb', line 11

def initialize(name:,
               persister:,
               max_attempts: DEFAULT_MAX_ATTEMPTS,
               timeout: DEFAULT_TIMEOUT,
               update_period: DEFAULT_UPDATE_PERIOD,
               max_tasks: DEFAULT_MAX_TASKS)
   raise ArgumentError.new('Queue name may not be nil') unless name
   raise ArgumentError.new('Persister may not be nil') unless persister

   raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #read_tasks')) unless persister.respond_to? :read_tasks
   raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #update_task')) unless persister.respond_to? :update_task
   raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #delete_task')) unless persister.respond_to? :delete_task


   @name          = name.to_s.gsub(/\s/, '_').to_sym
   @timeout       = timeout
   @max_attempts  = max_attempts
   @update_period = update_period
   @max_tasks     = max_tasks
   @persister     = persister
end

Instance Attribute Details

#max_attemptsObject (readonly)

Returns the value of attribute max_attempts.



8
9
10
# File 'lib/procrastinator/queue_worker.rb', line 8

def max_attempts
  @max_attempts
end

#max_tasksObject (readonly)

Returns the value of attribute max_tasks.



8
9
10
# File 'lib/procrastinator/queue_worker.rb', line 8

def max_tasks
  @max_tasks
end

#nameObject (readonly)

Returns the value of attribute name.



8
9
10
# File 'lib/procrastinator/queue_worker.rb', line 8

def name
  @name
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



8
9
10
# File 'lib/procrastinator/queue_worker.rb', line 8

def timeout
  @timeout
end

#update_periodObject (readonly)

Returns the value of attribute update_period.



8
9
10
# File 'lib/procrastinator/queue_worker.rb', line 8

def update_period
  @update_period
end

Instance Method Details

#workObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/procrastinator/queue_worker.rb', line 33

def work
   loop do
      sleep(@update_period)

      # shuffling and re-sorting to avoid worst case O(n^2) on quicksort
      # when receiving already sorted data. Ideally, we'd use a better algo, but this will do for now
      tasks = @persister.read_tasks(@name).shuffle.sort_by { |t| t[:run_at] }

      tasks.first(@max_tasks).each do |task_data|
         if Time.now.to_i >= task_data[:run_at].to_i
            tw = TaskWorker.new(task_data)

            tw.work

            if tw.successful?
               @persister.delete_task(task_data[:id])
            else
               @persister.update_task(tw.to_hash.merge(queue: @name))
            end
         end
      end
   end
end