Class: Procrastinator::QueueWorker
- Inherits:
-
Object
- Object
- Procrastinator::QueueWorker
- Defined in:
- lib/procrastinator/queue_worker.rb
Constant Summary collapse
- DEFAULT_TIMEOUT =
seconds = one hour
3600- DEFAULT_MAX_ATTEMPTS =
20- DEFAULT_UPDATE_PERIOD =
seconds
10- DEFAULT_MAX_TASKS =
10
Instance Attribute Summary collapse
-
#max_attempts ⇒ Object
readonly
Returns the value of attribute max_attempts.
-
#max_tasks ⇒ Object
readonly
Returns the value of attribute max_tasks.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#update_period ⇒ Object
readonly
Returns the value of attribute update_period.
Instance Method Summary collapse
-
#initialize(name:, persister:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) ⇒ QueueWorker
constructor
Timeout is in seconds.
- #work ⇒ Object
Constructor Details
#initialize(name:, persister:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) ⇒ QueueWorker
Timeout is in seconds
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/procrastinator/queue_worker.rb', line 11 def initialize(name:, persister:, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) raise ArgumentError.new('Queue name may not be nil') unless name raise ArgumentError.new('Persister may not be nil') unless persister raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #read_tasks')) unless persister.respond_to? :read_tasks raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #update_task')) unless persister.respond_to? :update_task raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #delete_task')) unless persister.respond_to? :delete_task @name = name.to_s.gsub(/\s/, '_').to_sym @timeout = timeout @max_attempts = max_attempts @update_period = update_period @max_tasks = max_tasks @persister = persister end |
Instance Attribute Details
#max_attempts ⇒ Object (readonly)
Returns the value of attribute max_attempts.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def max_attempts @max_attempts end |
#max_tasks ⇒ Object (readonly)
Returns the value of attribute max_tasks.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def max_tasks @max_tasks end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def name @name end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def timeout @timeout end |
#update_period ⇒ Object (readonly)
Returns the value of attribute update_period.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def update_period @update_period end |
Instance Method Details
#work ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/procrastinator/queue_worker.rb', line 33 def work loop do sleep(@update_period) # shuffling and re-sorting to avoid worst case O(n^2) on quicksort # when receiving already sorted data. Ideally, we'd use a better algo, but this will do for now tasks = @persister.read_tasks(@name).shuffle.sort_by { |t| t[:run_at] } tasks.first(@max_tasks).each do |task_data| if Time.now.to_i >= task_data[:run_at].to_i tw = TaskWorker.new(task_data) tw.work if tw.successful? @persister.delete_task(task_data[:id]) else @persister.update_task(tw.to_hash.merge(queue: @name)) end end end end end |