Class: Procrastinator::QueueWorker
- Inherits:
-
Object
- Object
- Procrastinator::QueueWorker
- Defined in:
- lib/procrastinator/queue_worker.rb
Constant Summary collapse
- DEFAULT_TIMEOUT =
in seconds; one hour total
3600- DEFAULT_MAX_ATTEMPTS =
20- DEFAULT_UPDATE_PERIOD =
seconds
10- DEFAULT_MAX_TASKS =
10
Instance Attribute Summary collapse
-
#max_attempts ⇒ Object
readonly
Returns the value of attribute max_attempts.
-
#max_tasks ⇒ Object
readonly
Returns the value of attribute max_tasks.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#timeout ⇒ Object
readonly
Returns the value of attribute timeout.
-
#update_period ⇒ Object
readonly
Returns the value of attribute update_period.
Instance Method Summary collapse
- #act ⇒ Object
-
#initialize(name:, persister:, log_dir: nil, log_level: Logger::INFO, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) ⇒ QueueWorker
constructor
Timeout is in seconds.
- #log_parent_exit ⇒ Object
- #long_name ⇒ Object
-
#start_log ⇒ Object
Starts a log file and stores the logger within this queue worker.
- #work ⇒ Object
Constructor Details
#initialize(name:, persister:, log_dir: nil, log_level: Logger::INFO, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) ⇒ QueueWorker
Timeout is in seconds
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/procrastinator/queue_worker.rb', line 11 def initialize(name:, persister:, log_dir: nil, log_level: Logger::INFO, max_attempts: DEFAULT_MAX_ATTEMPTS, timeout: DEFAULT_TIMEOUT, update_period: DEFAULT_UPDATE_PERIOD, max_tasks: DEFAULT_MAX_TASKS) raise ArgumentError.new('Queue name may not be nil') unless name raise ArgumentError.new('Persister may not be nil') unless persister raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #read_tasks')) unless persister.respond_to? :read_tasks raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #update_task')) unless persister.respond_to? :update_task raise(MalformedTaskPersisterError.new('The supplied IO object must respond to #delete_task')) unless persister.respond_to? :delete_task @name = name.to_s.gsub(/\s/, '_').to_sym @timeout = timeout @max_attempts = max_attempts @update_period = update_period @max_tasks = max_tasks @persister = persister @log_dir = log_dir @log_level = log_level start_log end |
Instance Attribute Details
#max_attempts ⇒ Object (readonly)
Returns the value of attribute max_attempts.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def max_attempts @max_attempts end |
#max_tasks ⇒ Object (readonly)
Returns the value of attribute max_tasks.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def max_tasks @max_tasks end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def name @name end |
#timeout ⇒ Object (readonly)
Returns the value of attribute timeout.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def timeout @timeout end |
#update_period ⇒ Object (readonly)
Returns the value of attribute update_period.
8 9 10 |
# File 'lib/procrastinator/queue_worker.rb', line 8 def update_period @update_period end |
Instance Method Details
#act ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/procrastinator/queue_worker.rb', line 51 def act # shuffling and re-sorting to avoid worst case O(n^2) on quicksort (which is default ruby sort) # when receiving already sorted data. Ideally, we'd use a better algo, but this will do for now tasks = @persister.read_tasks(@name).reject { |t| t[:run_at].nil? }.shuffle.sort_by { |t| t[:run_at] } tasks.first(@max_tasks).each do |task_data| if Time.now.to_i >= task_data[:run_at].to_i task_data.merge!(logger: @logger) if @logger tw = TaskWorker.new(task_data) tw.work if tw.successful? @persister.delete_task(task_data[:id]) else @persister.update_task(tw.to_hash.merge(queue: @name)) end end end end |
#log_parent_exit ⇒ Object
101 102 103 104 105 |
# File 'lib/procrastinator/queue_worker.rb', line 101 def log_parent_exit raise RuntimeError.new('Cannot log when logger not defined. Call #start_log first.') unless @logger @logger.error("Terminated worker process (#{Process.pid}) due to main process (#{Process.ppid}) disappearing.") end |
#long_name ⇒ Object
73 74 75 |
# File 'lib/procrastinator/queue_worker.rb', line 73 def long_name "#{@name}-queue-worker" end |
#start_log ⇒ Object
Starts a log file and stores the logger within this queue worker.
Separate from init because logging is context-dependent
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/procrastinator/queue_worker.rb', line 80 def start_log if @log_dir log_path = Pathname.new("#{@log_dir}/#{long_name}.log") log_path.dirname.mkpath File.open(log_path.to_path, 'a+') do |f| f.write '' end @logger = Logger.new(log_path.to_path) @logger.level = @log_level @logger.info(['', '===================================', "Started worker process, #{long_name}, to work off queue #{@name}.", "Worker pid=#{Process.pid}; parent pid=#{Process.ppid}.", '==================================='].join("\n")) end end |
#work ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/procrastinator/queue_worker.rb', line 38 def work begin loop do sleep(@update_period) act end rescue StandardError => e @logger.fatal(e) # raise e end end |