Class: Procrastinator::QueueWorker
- Inherits:
-
Object
- Object
- Procrastinator::QueueWorker
- Defined in:
- lib/procrastinator/queue_worker.rb
Constant Summary collapse
- DEFAULT_TIMEOUT =
in seconds; one hour total
3600- DEFAULT_MAX_ATTEMPTS =
20- DEFAULT_UPDATE_PERIOD =
seconds
10- DEFAULT_MAX_TASKS =
10
Instance Attribute Summary collapse
-
#max_attempts ⇒ Object
readonly
Returns the value of attribute max_attempts.
-
#max_tasks ⇒ Object
readonly
Returns the value of attribute max_tasks.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#update_period ⇒ Object
readonly
Returns the value of attribute update_period.
Instance Method Summary collapse
- #act ⇒ Object
-
#initialize(name:, persister:, log_dir: nil, log_level: Logger::INFO, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) ⇒ QueueWorker
constructor
Timeout is in seconds.
- #log_parent_exit ⇒ Object
- #long_name ⇒ Object
-
#start_log ⇒ Object
Starts a log file and stores the logger within this queue worker.
- #work ⇒ Object
Constructor Details
#initialize(name:, persister:, log_dir: nil, log_level: Logger::INFO, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) ⇒ QueueWorker
Timeout is in seconds
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/procrastinator/queue_worker.rb', line 11 def initialize(name:, persister:, log_dir: nil, log_level: Logger::INFO, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) raise ArgumentError.new('Queue name may not be nil') unless name raise ArgumentError.new('Persister may not be nil') unless persister raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #read_tasks')) unless persister.respond_to? :read_tasks raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #update_task')) unless persister.respond_to? :update_task raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #delete_task')) unless persister.respond_to? :delete_task @name = name.to_s.gsub(/\s/, '_').to_sym @timeout = timeout @max_attempts = max_attempts @update_period = update_period @max_tasks = max_tasks @persister = persister @log_dir = log_dir @log_level = log_level end |
Instance Attribute Details
#max_attempts ⇒ Object (readonly)
Returns the value of attribute max_attempts.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def max_attempts @max_attempts end |
#max_tasks ⇒ Object (readonly)
Returns the value of attribute max_tasks.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def max_tasks @max_tasks end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def name @name end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def timeout @timeout end |
#update_period ⇒ Object (readonly)
Returns the value of attribute update_period.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def update_period @update_period end |
Instance Method Details
#act ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
# File 'lib/procrastinator/queue_worker.rb', line 44 def act # shuffling and re-sorting to avoid worst case O(n^2) on quicksort # when receiving already sorted data. Ideally, we'd use a better algo, but this will do for now tasks = @persister.read_tasks(@name).shuffle.sort_by { |t| t[:run_at] } tasks.first(@max_tasks).each do |task_data| if Time.now.to_i >= task_data[:run_at].to_i task_data.merge!(logger: @logger) if @logger tw = TaskWorker.new(task_data) tw.work if tw.successful? @persister.delete_task(task_data[:id]) else @persister.update_task(tw.to_hash.merge(queue: @name)) end end end end |
#log_parent_exit ⇒ Object
94 95 96 97 98 |
# File 'lib/procrastinator/queue_worker.rb', line 94 def log_parent_exit raise RuntimeError.new('Cannot log when logger not defined. Call #start_log first.') unless @logger @logger.error("Terminated worker process (#{Process.pid}) due to main process (#{Process.ppid}) disappearing.") end |
#long_name ⇒ Object
66 67 68 |
# File 'lib/procrastinator/queue_worker.rb', line 66 def long_name "#{@name}-queue-worker" end |
#start_log ⇒ Object
Starts a log file and stores the logger within this queue worker.
Separate from init because logging is context-dependent
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/procrastinator/queue_worker.rb', line 73 def start_log if @log_dir log_path = Pathname.new("#{@log_dir}/#{long_name}.log") log_path.dirname.mkpath File.open(log_path.to_path, 'a+') do |f| f.write '' end @logger = Logger.new(log_path.to_path) @logger.level = @log_level @logger.info(['', '===================================', "Started worker process, #{long_name}, to work off queue #{@name}.", "Worker pid=#{Process.pid}; parent pid=#{Process.ppid}.", '==================================='].join("\n")) end end |
#work ⇒ Object
36 37 38 39 40 41 42 |
# File 'lib/procrastinator/queue_worker.rb', line 36 def work loop do sleep(@update_period) act end end |