Class: ActiveJob::Plugins::Resque::Solo::Inspector

Inherits:
Object
  • Object
show all
Defined in:
lib/active_job/plugins/resque/solo/inspector.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(any_args, only_args, except_args, lock_key_prefix) ⇒ Inspector

Returns a new instance of Inspector.



11
12
13
14
15
16
17
18
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 11

def initialize(any_args, only_args, except_args, lock_key_prefix)
  @any_args = !!any_args
  @only_args = only_args
  @except_args = except_args || []
  # always ignore the ActiveJob symbol hash key.
  @except_args << "_aj_symbol_keys" unless @except_args.include?("_aj_symbol_keys")
  @lock_key_prefix = lock_key_prefix.present? ? lock_key_prefix : "ajr_solo"
end

Class Method Details

.resque_present?Boolean

Returns:

  • (Boolean)


20
21
22
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 20

def self.resque_present?
  ActiveJob::Base.queue_adapter == ActiveJob::QueueAdapters::ResqueAdapter
end

Instance Method Details

#around_enqueue(job, block) ⇒ Object



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 24

def around_enqueue(job, block)
  if Inspector::resque_present?

    Lock.try_acquire_release(lock_key(job)) do |lock, extend_at|
      @lock = lock
      @extend_lock_at = extend_at

      if !job_enqueued?(job) && !job_executing?(job)
        block.call
      end
    end
  else
    # if resque is not present, always enqueue
    block.call
  end
end

#extend_lockObject



120
121
122
123
124
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 120

def extend_lock
  if Time.now.utc >= @extend_lock_at
    @extend_lock_at = @lock.extend
  end
end

#job(job) ⇒ Object



88
89
90
91
92
93
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 88

def job(job)
  job_arguments = ActiveJob::Arguments.serialize(job.arguments)
  job_arguments = job_args(job_arguments)
  job_class = job.class.name
  [ job_class, job_arguments ]
end

#job_args(args) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 95

def job_args(args)
  if args.present?
    if @any_args
      args = []
    else
      args.map do |arg|
        if arg.is_a? Hash
          arg.keep_if { |k,v| @only_args.include?(k.to_s) } if @only_args.present?
          arg.keep_if { |k,v| !@except_args.include?(k.to_s) } if @except_args.present?
        end

        arg
      end
    end
  end

  args
end

#job_enqueued?(job) ⇒ Boolean

Returns:

  • (Boolean)


41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 41

def job_enqueued?(job)
  size = ::Resque.size(job.queue_name)
  return false if size.zero?

  scheduled_jobs = ::Resque.peek(job.queue_name, 0, 0)

  extend_lock

  job_class, job_arguments = job(job)

  (scheduled_jobs.size-1).downto(0) do |i|
    scheduled_job = scheduled_jobs[i]
    return true if job_enqueued_with_args?(job_class, job_arguments, scheduled_job)
    extend_lock
  end

  false
end

#job_enqueued_with_args?(job_class, job_arguments, scheduled_job) ⇒ Boolean

Returns:

  • (Boolean)


75
76
77
78
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 75

def job_enqueued_with_args?(job_class, job_arguments, scheduled_job)
  args = scheduled_job["args"][0]
  job_with_args_eq?(job_class, job_arguments, args)
end

#job_executing?(job) ⇒ Boolean

Returns:

  • (Boolean)


60
61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 60

def job_executing?(job)
  job_class, job_arguments = job(job)

  is_executing = ::Resque.workers.any? do |worker|
    processing = worker.processing
    next false if processing.blank?
    args = processing["payload"]["args"][0]
    job_with_args_eq?(job_class, job_arguments, args)
  end

  extend_lock unless is_executing

  is_executing
end

#job_with_args_eq?(job_class, job_arguments, wrapper_args) ⇒ Boolean

Returns:

  • (Boolean)


80
81
82
83
84
85
86
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 80

def job_with_args_eq?(job_class, job_arguments, wrapper_args)
  return false if wrapper_args['job_class'] != job_class
  encoded_arguments = wrapper_args['arguments']
  encoded_arguments = [] if encoded_arguments.nil?
  encoded_arguments = job_args(encoded_arguments)
  encoded_arguments == job_arguments
end

#lock_key(job) ⇒ Object



114
115
116
117
118
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 114

def lock_key(job)
  job_class, job_arguments = job(job)
  sha1 = Digest::SHA1.hexdigest(job_arguments.to_json)
  "#{@lock_key_prefix}:#{job_class}:#{sha1}"
end