Class: Worker::Queue
- Inherits:
-
Object
- Object
- Worker::Queue
- Defined in:
- lib/worker/queue.rb
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#jobs_per_worker ⇒ Object
Returns the value of attribute jobs_per_worker.
-
#log_backtrace ⇒ Object
Returns the value of attribute log_backtrace.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#max_attempts ⇒ Object
Returns the value of attribute max_attempts.
-
#max_job_time ⇒ Object
Returns the value of attribute max_job_time.
-
#poll_interval ⇒ Object
Returns the value of attribute poll_interval.
-
#queue_name ⇒ Object
Returns the value of attribute queue_name.
Instance Method Summary collapse
-
#clean_locks ⇒ Object
cleans locked by jobs that have been dormant for a while.
- #create_table ⇒ Object
- #db ⇒ Object
-
#dequeue ⇒ Object
locks a set of jobs for this worker.
- #enqueue(job) ⇒ Object
-
#initialize(opts = {}) ⇒ Queue
constructor
A new instance of Queue.
- #remove(id) ⇒ Object
- #run(number_to_run = nil) ⇒ Object
- #set_error(job, error) ⇒ Object
- #unlock_all ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Queue
Returns a new instance of Queue.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/worker/queue.rb', line 10 def initialize(opts={}) @logger = Logger.new(STDOUT) db_config = opts[:db_config] || Worker.db_config db_config.merge(logger: logger) if opts[:log_queries] @db = Sequel.connect(db_config) name = `hostname`.chomp("\n") @id = "#{name}.#{Process.pid}.#{rand(0..1000)}-#{opts[:queue_name] || 'all'}" @jobs_per_worker = opts[:jobs_per_worker] || 5 @max_job_time = opts[:max_job_time] || 5 * 60 # in seconds @queue_name = opts[:queue_name] @max_attempts = opts[:max_attempts] || 20 @poll_interval = opts[:poll_interval] || 5 # 5 second polling @log_backtrace = !!opts[:log_backtrace] create_table end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
7 8 9 |
# File 'lib/worker/queue.rb', line 7 def id @id end |
#jobs_per_worker ⇒ Object
Returns the value of attribute jobs_per_worker.
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def jobs_per_worker @jobs_per_worker end |
#log_backtrace ⇒ Object
Returns the value of attribute log_backtrace.
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def log_backtrace @log_backtrace end |
#logger ⇒ Object
Returns the value of attribute logger.
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def logger @logger end |
#max_attempts ⇒ Object
Returns the value of attribute max_attempts.
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def max_attempts @max_attempts end |
#max_job_time ⇒ Object
Returns the value of attribute max_job_time.
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def max_job_time @max_job_time end |
#poll_interval ⇒ Object
Returns the value of attribute poll_interval.
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def poll_interval @poll_interval end |
#queue_name ⇒ Object
Returns the value of attribute queue_name.
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def queue_name @queue_name end |
Instance Method Details
#clean_locks ⇒ Object
cleans locked by jobs that have been dormant for a while
59 60 61 |
# File 'lib/worker/queue.rb', line 59 def clean_locks db[:worker_jobs].where('locked_at < ?', Time.now - max_job_time).update(locked_by: nil) end |
#create_table ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/worker/queue.rb', line 33 def create_table db.create_table? :worker_jobs do primary_key :id Time :locked_at, null: true Time :run_at, null: true String :locked_by, null: true String :handler, null: false, text: true String :last_error, null: true, text: true String :queue, null: false, default: 'default' String :job_class, null: false Integer :attempts, null: false, default: 0 Integer :priority, null: false, default: 0 end end |
#db ⇒ Object
92 93 94 |
# File 'lib/worker/queue.rb', line 92 def db @db end |
#dequeue ⇒ Object
locks a set of jobs for this worker
64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/worker/queue.rb', line 64 def dequeue query = db[:worker_jobs] .where( '`run_at` < ? AND `locked_by` IS NULL AND `id` in (SELECT `id` FROM `worker_jobs` ORDER BY `priority` DESC LIMIT ?)', Time.now, jobs_per_worker) if queue_name query = query.where(queue: queue_name) end query.update(locked_by: id, locked_at: Time.now) db[:worker_jobs].where(locked_by: id) end |
#enqueue(job) ⇒ Object
78 79 80 81 82 83 84 85 86 |
# File 'lib/worker/queue.rb', line 78 def enqueue(job) db[:worker_jobs].insert( job_class: job.class.name, queue: job.queue, priority: job.priority, handler: job.serialize, run_at: job.run_at, ) end |
#remove(id) ⇒ Object
54 55 56 |
# File 'lib/worker/queue.rb', line 54 def remove(id) db[:worker_jobs].where(id: id).delete end |
#run(number_to_run = nil) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
# File 'lib/worker/queue.rb', line 96 def run(number_to_run=nil) begin while true logger.debug("Polling") t1 = Time.now failures = 0 successes = 0 dequeue.each do |item| if number_to_run number_to_run -= 1 end logger.info("Performing #{item[:job_class]}.#{item[:id]} attempts=#{item[:attempts]} priority=#{item[:priority]} run_at=#{item[:run_at]}") job = nil begin job = eval(item[:job_class]).deserialize(item[:handler]) Timeout::timeout(max_job_time) do job.before job.perform job.on_success end remove(item[:id]) successes += 1 logger.info("Successful #{item[:job_class]}.#{item[:id]}") rescue Exception => e job.on_failure if job if item[:attempts] + 1 > max_attempts remove(item[:id]) else set_error(item, e) end failures += 1 logger.info("Failed #{item[:job_class]}.#{item[:id]} err: #{e.class.name} #{e.}") if log_backtrace puts e.backtrace end end end t2 = Time.now if failures + successes > 0 logger.info("Performed #{failures + successes} jobs in #{t2 - t1} seconds") end return if number_to_run <= 0 sleep(poll_interval) end rescue StandardError => e logger.warn("Exiting due to #{e}") return ensure logger.info("Unlocking") unlock_all end end |
#set_error(job, error) ⇒ Object
48 49 50 51 52 |
# File 'lib/worker/queue.rb', line 48 def set_error(job, error) db[:worker_jobs] .where(id: job[:id]) .update(last_error: error.backtrace.join("\n"), run_at: Time.now + 10) end |
#unlock_all ⇒ Object
88 89 90 |
# File 'lib/worker/queue.rb', line 88 def unlock_all db[:worker_jobs].where(locked_by: id).update(locked_by: nil) end |