Class: Worker::Queue
- Inherits:
-
Object
- Object
- Worker::Queue
- Defined in:
- lib/worker/queue.rb
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#jobs_per_worker ⇒ Object
Returns the value of attribute jobs_per_worker.
-
#log_backtrace ⇒ Object
Returns the value of attribute log_backtrace.
-
#logger ⇒ Object
Returns the value of attribute logger.
-
#max_attempts ⇒ Object
Returns the value of attribute max_attempts.
-
#max_job_time ⇒ Object
Returns the value of attribute max_job_time.
-
#poll_interval ⇒ Object
Returns the value of attribute poll_interval.
-
#queue_name ⇒ Object
Returns the value of attribute queue_name.
Instance Method Summary collapse
-
#clean_locks ⇒ Object
cleans locked by jobs that have been dormant for a while.
- #create_table ⇒ Object
- #db ⇒ Object
-
#dequeue ⇒ Object
locks a set of jobs for this worker.
- #enqueue(job) ⇒ Object
-
#initialize(opts = {}) ⇒ Queue
constructor
A new instance of Queue.
- #remove(id) ⇒ Object
- #run(number_to_run = nil) ⇒ Object
- #set_error(job, error) ⇒ Object
- #unlock_all ⇒ Object
Constructor Details
#initialize(opts = {}) ⇒ Queue
Returns a new instance of Queue.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/worker/queue.rb', line 10 def initialize(opts={}) @logger = Logger.new(STDOUT) db_config = opts[:db_config] || Worker.db db_config.merge(logger: logger) if opts[:log_queries] @db = Sequel.connect(db_config) name = `hostname`.chomp("\n") @id = "#{name}.#{Process.pid}.#{rand(0..1000)}-#{opts[:queue_name] || 'all'}" @jobs_per_worker = opts[:jobs_per_worker] || 5 @max_job_time = opts[:max_job_time] || 5 * 60 # in seconds @queue_name = opts[:queue_name] @max_attempts = opts[:max_attempts] || 20 @poll_interval = opts[:poll_interval] || 5 # 5 second polling @log_backtrace = !!opts[:log_backtrace] @db_adapter = db_config[:adapter] create_table end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id
7 8 9 |
# File 'lib/worker/queue.rb', line 7 def id @id end |
#jobs_per_worker ⇒ Object
Returns the value of attribute jobs_per_worker
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def jobs_per_worker @jobs_per_worker end |
#log_backtrace ⇒ Object
Returns the value of attribute log_backtrace
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def log_backtrace @log_backtrace end |
#logger ⇒ Object
Returns the value of attribute logger
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def logger @logger end |
#max_attempts ⇒ Object
Returns the value of attribute max_attempts
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def max_attempts @max_attempts end |
#max_job_time ⇒ Object
Returns the value of attribute max_job_time
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def max_job_time @max_job_time end |
#poll_interval ⇒ Object
Returns the value of attribute poll_interval
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def poll_interval @poll_interval end |
#queue_name ⇒ Object
Returns the value of attribute queue_name
8 9 10 |
# File 'lib/worker/queue.rb', line 8 def queue_name @queue_name end |
Instance Method Details
#clean_locks ⇒ Object
cleans locked by jobs that have been dormant for a while
60 61 62 |
# File 'lib/worker/queue.rb', line 60 def clean_locks db[:worker_jobs].where('locked_at < ?', Time.now - max_job_time).update(locked_by: nil) end |
#create_table ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/worker/queue.rb', line 34 def create_table db.create_table? :worker_jobs do primary_key :id Time :locked_at, null: true Time :run_at, null: true String :locked_by, null: true String :handler, null: false, text: true String :last_error, null: true, text: true String :queue, null: false, default: 'default' String :job_class, null: false Integer :attempts, null: false, default: 0 Integer :priority, null: false, default: 0 end end |
#db ⇒ Object
102 103 104 |
# File 'lib/worker/queue.rb', line 102 def db @db end |
#dequeue ⇒ Object
locks a set of jobs for this worker
65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/worker/queue.rb', line 65 def dequeue if @db_adapter == 'mysql2' || @db_adapter == 'mysql' query = db[:worker_jobs] .limit(jobs_per_worker) .where('`run_at` < ?', Time.now) .where('`locked_by` IS NULL') .reverse_order('priority') else query = db[:worker_jobs] .where( '`run_at` < ? AND `locked_by` IS NULL AND `id` in (SELECT `id` FROM `worker_jobs` ORDER BY `priority` DESC LIMIT ?)', Time.now, jobs_per_worker) end if queue_name query = query.where(queue: queue_name) end query.update(locked_by: id, locked_at: Time.now) db[:worker_jobs].where(locked_by: id) end |
#enqueue(job) ⇒ Object
88 89 90 91 92 93 94 95 96 |
# File 'lib/worker/queue.rb', line 88 def enqueue(job) db[:worker_jobs].insert( job_class: job.class.name, queue: job.queue, priority: job.priority, handler: job.serialize, run_at: job.run_at, ) end |
#remove(id) ⇒ Object
55 56 57 |
# File 'lib/worker/queue.rb', line 55 def remove(id) db[:worker_jobs].where(id: id).delete end |
#run(number_to_run = nil) ⇒ Object
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/worker/queue.rb', line 106 def run(number_to_run=nil) begin while true logger.debug("Polling") t1 = Time.now failures = 0 successes = 0 dequeue.each do |item| if number_to_run number_to_run -= 1 end logger.info("Performing #{item[:job_class]}.#{item[:id]} attempts=#{item[:attempts]} priority=#{item[:priority]} run_at=#{item[:run_at]}") job = nil begin job = eval(item[:job_class]).deserialize(item[:handler]) Timeout::timeout(max_job_time) do job.before job.perform job.on_success end remove(item[:id]) successes += 1 logger.info("Successful #{item[:job_class]}.#{item[:id]}") rescue Exception => e job.on_failure if job if item[:attempts] + 1 > max_attempts remove(item[:id]) else set_error(item, e) end failures += 1 logger.info("Failed #{item[:job_class]}.#{item[:id]} err: #{e.class.name} #{e.}") if log_backtrace puts e.backtrace end end end t2 = Time.now if failures + successes > 0 logger.info("Performed #{failures + successes} jobs in #{t2 - t1} seconds") end return if number_to_run && number_to_run <= 0 sleep(poll_interval) end rescue StandardError => e logger.warn("Exiting due to #{e}") return ensure logger.info("Unlocking") unlock_all end end |
#set_error(job, error) ⇒ Object
49 50 51 52 53 |
# File 'lib/worker/queue.rb', line 49 def set_error(job, error) db[:worker_jobs] .where(id: job[:id]) .update(last_error: error.backtrace.join("\n"), run_at: Time.now + 10) end |
#unlock_all ⇒ Object
98 99 100 |
# File 'lib/worker/queue.rb', line 98 def unlock_all db[:worker_jobs].where(locked_by: id).update(locked_by: nil) end |