Module: SwitchmanInstJobs::Switchman::Shard
- Defined in:
- lib/switchman_inst_jobs/switchman/shard.rb
Defined Under Namespace
Modules: ClassMethods
Class Method Summary collapse
Instance Method Summary collapse
- #clear_cache ⇒ Object
- #delayed_jobs_shard ⇒ Object
-
#hold_jobs!(wait: false) ⇒ Object
Adapted from hold/unhold methods in base delayed jobs base Wait is required to be able to safely move jobs.
- #unhold_jobs! ⇒ Object
Class Method Details
.prepended(base) ⇒ Object
4 5 6 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 4 def self.prepended(base) base.singleton_class.prepend(ClassMethods) end |
Instance Method Details
#clear_cache ⇒ Object
8 9 10 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 8 def clear_cache self.class.connection.after_transaction_commit { super } end |
#delayed_jobs_shard ⇒ Object
12 13 14 15 16 17 18 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 12 def delayed_jobs_shard if read_attribute(:delayed_jobs_shard_id) shard = ::Switchman::Shard.lookup(delayed_jobs_shard_id) return shard if shard end @delayed_jobs_shard ||= database_server&.delayed_jobs_shard(self) end |
#hold_jobs!(wait: false) ⇒ Object
Adapted from hold/unhold methods in base delayed jobs base Wait is required to be able to safely move jobs
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 22 def hold_jobs!(wait: false) self.jobs_held = true save! if changed? delayed_jobs_shard.activate(::Delayed::Backend::ActiveRecord::AbstractJob) do lock_jobs_for_hold end return unless wait delayed_jobs_shard.activate(::Delayed::Backend::ActiveRecord::AbstractJob) do while ::Delayed::Job.where(shard_id: id). where.not(locked_at: nil). where.not(locked_by: ::Delayed::Backend::Base::ON_HOLD_LOCKED_BY).exists? sleep 10 lock_jobs_for_hold end end end |
#unhold_jobs! ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/switchman_inst_jobs/switchman/shard.rb', line 40 def unhold_jobs! self.jobs_held = false if changed? save! # Wait a little over the 60 second in-process shard cache clearing # threshold to ensure that all new jobs are now being enqueued # unlocked Rails.logger.debug('Waiting for caches to clear') sleep(65) end delayed_jobs_shard.activate(::Delayed::Backend::ActiveRecord::AbstractJob) do ::Delayed::Job.where(locked_by: ::Delayed::Backend::Base::ON_HOLD_LOCKED_BY, shard_id: id). in_batches(of: 10_000). update_all( locked_by: nil, locked_at: nil, attempts: 0, failed_at: nil ) end end |