Class: ActiveJob::Plugins::Resque::Solo::Inspector
- Inherits:
-
Object
- Object
- ActiveJob::Plugins::Resque::Solo::Inspector
- Defined in:
- lib/active_job/plugins/resque/solo/inspector.rb
Class Method Summary collapse
Instance Method Summary collapse
- #around_enqueue(job, block) ⇒ Object
- #extend_lock ⇒ Object
-
#initialize(any_args, only_args, except_args, lock_key_prefix) ⇒ Inspector
constructor
A new instance of Inspector.
- #job(job) ⇒ Object
- #job_args(args) ⇒ Object
- #job_enqueued?(job) ⇒ Boolean
- #job_enqueued_with_args?(job_class, job_arguments, scheduled_job) ⇒ Boolean
- #job_executing?(job) ⇒ Boolean
- #job_with_args_eq?(job_class, job_arguments, wrapper_args) ⇒ Boolean
- #lock_key(job) ⇒ Object
Constructor Details
#initialize(any_args, only_args, except_args, lock_key_prefix) ⇒ Inspector
Returns a new instance of Inspector.
11 12 13 14 15 16 17 18 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 11 def initialize(any_args, only_args, except_args, lock_key_prefix) @any_args = !!any_args @only_args = only_args @except_args = except_args || [] # always ignore the ActiveJob symbol hash key. @except_args << "_aj_symbol_keys" unless @except_args.include?("_aj_symbol_keys") @lock_key_prefix = lock_key_prefix.present? ? lock_key_prefix : "ajr_solo" end |
Class Method Details
.resque_present? ⇒ Boolean
20 21 22 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 20 def self.resque_present? ActiveJob::Base.queue_adapter == ActiveJob::QueueAdapters::ResqueAdapter end |
Instance Method Details
#around_enqueue(job, block) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 24 def around_enqueue(job, block) if Inspector::resque_present? Lock.try_acquire_release(lock_key(job)) do |lock, extend_at| @lock = lock @extend_lock_at = extend_at if !job_enqueued?(job) && !job_executing?(job) block.call end end else # if resque is not present, always enqueue block.call end end |
#extend_lock ⇒ Object
120 121 122 123 124 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 120 def extend_lock if Time.now.utc >= @extend_lock_at @extend_lock_at = @lock.extend end end |
#job(job) ⇒ Object
88 89 90 91 92 93 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 88 def job(job) job_arguments = ActiveJob::Arguments.serialize(job.arguments) job_arguments = job_args(job_arguments) job_class = job.class.name [ job_class, job_arguments ] end |
#job_args(args) ⇒ Object
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 95 def job_args(args) if args.present? if @any_args args = [] else args.map do |arg| if arg.is_a? Hash arg.keep_if { |k,v| @only_args.include?(k.to_s) } if @only_args.present? arg.keep_if { |k,v| !@except_args.include?(k.to_s) } if @except_args.present? end arg end end end args end |
#job_enqueued?(job) ⇒ Boolean
41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 41 def job_enqueued?(job) size = ::Resque.size(job.queue_name) return false if size.zero? scheduled_jobs = ::Resque.peek(job.queue_name, 0, 0) extend_lock job_class, job_arguments = job(job) (scheduled_jobs.size-1).downto(0) do |i| scheduled_job = scheduled_jobs[i] return true if job_enqueued_with_args?(job_class, job_arguments, scheduled_job) extend_lock end false end |
#job_enqueued_with_args?(job_class, job_arguments, scheduled_job) ⇒ Boolean
75 76 77 78 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 75 def job_enqueued_with_args?(job_class, job_arguments, scheduled_job) args = scheduled_job["args"][0] job_with_args_eq?(job_class, job_arguments, args) end |
#job_executing?(job) ⇒ Boolean
60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 60 def job_executing?(job) job_class, job_arguments = job(job) is_executing = ::Resque.workers.any? do |worker| processing = worker.processing next false if processing.blank? args = processing["payload"]["args"][0] job_with_args_eq?(job_class, job_arguments, args) end extend_lock unless is_executing is_executing end |
#job_with_args_eq?(job_class, job_arguments, wrapper_args) ⇒ Boolean
80 81 82 83 84 85 86 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 80 def job_with_args_eq?(job_class, job_arguments, wrapper_args) return false if wrapper_args['job_class'] != job_class encoded_arguments = wrapper_args['arguments'] encoded_arguments = [] if encoded_arguments.nil? encoded_arguments = job_args(encoded_arguments) encoded_arguments == job_arguments end |
#lock_key(job) ⇒ Object
114 115 116 117 118 |
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 114 def lock_key(job) job_class, job_arguments = job(job) sha1 = Digest::SHA1.hexdigest(job_arguments.to_json) "#{@lock_key_prefix}:#{job_class}:#{sha1}" end |