Class: Delayed::HomeManager
- Inherits:
-
Object
- Object
- Delayed::HomeManager
- Includes:
- Celluloid
- Defined in:
- lib/delayed/home_manager.rb
Constant Summary collapse
- DEFAULT_SLEEP_TIME =
2- DEFAULT_JOBS_TO_PULL =
20- DEFAULT_WORKERS_NUMBER =
16- DEFAULT_TIMEOUT =
20- DEFAULT_MAX_ATTEMPTS =
25
Instance Attribute Summary collapse
-
#alive ⇒ Object
Returns the value of attribute alive.
-
#hostname ⇒ Object
Returns the value of attribute hostname.
-
#max_attempts ⇒ Object
Returns the value of attribute max_attempts.
-
#queue ⇒ Object
Returns the value of attribute queue.
-
#sleep_time ⇒ Object
Returns the value of attribute sleep_time.
-
#timeout ⇒ Object
Returns the value of attribute timeout.
-
#timer ⇒ Object
Returns the value of attribute timer.
-
#worker_options ⇒ Object
Returns the value of attribute worker_options.
-
#workers_number ⇒ Object
Returns the value of attribute workers_number.
-
#workers_pool ⇒ Object
Returns the value of attribute workers_pool.
Instance Method Summary collapse
-
#initialize(options = nil) ⇒ HomeManager
constructor
A new instance of HomeManager.
- #kill ⇒ Object
-
#pull_next(queue = nil, n = 15) ⇒ Object
pull n items from Delayed::Job locks jobs until they’re processed (the worker then deletes the job).
- #start ⇒ Object
-
#unlock_all ⇒ Object
Unlock all jobs locked by our hostname in prior attempts.
Constructor Details
#initialize(options = nil) ⇒ HomeManager
Returns a new instance of HomeManager.
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/delayed/home_manager.rb', line 17 def initialize(=nil) self.alive = true if ( && .class == Hash) @sleep_time = [:sleep_time] @workers_number = [:workers_number] = [:worker_options] @queue = [:queue] @timeout = [:timeout] @max_attempts = [:max_attempts] end @sleep_time ||= DEFAULT_SLEEP_TIME @timeout ||= DEFAULT_TIMEOUT @workers_number ||= DEFAULT_WORKERS_NUMBER @max_attempts ||= DEFAULT_MAX_ATTEMPTS ||= {} # if @queue is nil, we'll pull from all queues end |
Instance Attribute Details
#alive ⇒ Object
Returns the value of attribute alive.
15 16 17 |
# File 'lib/delayed/home_manager.rb', line 15 def alive @alive end |
#hostname ⇒ Object
Returns the value of attribute hostname.
15 16 17 |
# File 'lib/delayed/home_manager.rb', line 15 def hostname @hostname end |
#max_attempts ⇒ Object
Returns the value of attribute max_attempts.
15 16 17 |
# File 'lib/delayed/home_manager.rb', line 15 def max_attempts @max_attempts end |
#queue ⇒ Object
Returns the value of attribute queue.
15 16 17 |
# File 'lib/delayed/home_manager.rb', line 15 def queue @queue end |
#sleep_time ⇒ Object
Returns the value of attribute sleep_time.
15 16 17 |
# File 'lib/delayed/home_manager.rb', line 15 def sleep_time @sleep_time end |
#timeout ⇒ Object
Returns the value of attribute timeout.
15 16 17 |
# File 'lib/delayed/home_manager.rb', line 15 def timeout @timeout end |
#timer ⇒ Object
Returns the value of attribute timer.
15 16 17 |
# File 'lib/delayed/home_manager.rb', line 15 def timer @timer end |
#worker_options ⇒ Object
Returns the value of attribute worker_options.
15 16 17 |
# File 'lib/delayed/home_manager.rb', line 15 def end |
#workers_number ⇒ Object
Returns the value of attribute workers_number.
15 16 17 |
# File 'lib/delayed/home_manager.rb', line 15 def workers_number @workers_number end |
#workers_pool ⇒ Object
Returns the value of attribute workers_pool.
15 16 17 |
# File 'lib/delayed/home_manager.rb', line 15 def workers_pool @workers_pool end |
Instance Method Details
#kill ⇒ Object
105 106 107 |
# File 'lib/delayed/home_manager.rb', line 105 def kill @alive = false end |
#pull_next(queue = nil, n = 15) ⇒ Object
pull n items from Delayed::Job locks jobs until they’re processed (the worker then deletes the job)
79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
# File 'lib/delayed/home_manager.rb', line 79 def pull_next(queue=nil, n=15) ids = [] Delayed::Job.transaction do query = Delayed::Job.where('(run_at is null or run_at < ?) and locked_at is null', DateTime.now).order('priority asc, run_at asc, id asc') if (queue && queue != 'default') query = query.where(:queue => queue) # if the queue is 'default' or nil, this would be the "default queue" elsif queue == 'default' query = query.where(:queue => ['default', nil]) end # if no queue name is provided, the "queue" column would be ignored # therefore returning the list of all jobs in all queues query = query.limit(n) ids = query.pluck(:id) query.update_all(:locked_at => DateTime.now.utc, :locked_by => hostname) end return Delayed::Job.where(:id => ids) end |
#start ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/delayed/home_manager.rb', line 38 def start # use Celluloid to create a pool of size @workers_number # worker_options get passed to HomeWorker's initializer @workers_pool = HomeWorker.pool(:size => @workers_number, :args => ) begin # make sure jobs locked by our hostname in prior attempts are unlocked unlock_all rescue DelayedJobActiveRecordThreaded.logger.error($!.) if DelayedJobActiveRecordThreaded.logger DelayedJobActiveRecordThreaded.logger.error($!.backtrace.join("\n")) if DelayedJobActiveRecordThreaded.logger end @timer = every(@sleep_time) { begin if (!@alive) @timer.cancel else jobs = pull_next(@queue, @workers_pool.idle_size) jobs.each { |j| @workers_pool.async.work(j) } end rescue # logging error watch begin DelayedJobActiveRecordThreaded.logger.error($!.) if DelayedJobActiveRecordThreaded.logger DelayedJobActiveRecordThreaded.logger.error($!.backtrace.join("\n")) if DelayedJobActiveRecordThreaded.logger rescue end end } end |
#unlock_all ⇒ Object
Unlock all jobs locked by our hostname in prior attempts
71 72 73 74 75 |
# File 'lib/delayed/home_manager.rb', line 71 def unlock_all Delayed::Job.transaction do Delayed::Job.where(:locked_by => hostname).update_all(:locked_by => nil, :locked_at => nil) end end |