Class: ActiveJob::Plugins::Resque::Solo::Inspector

Inherits:
Object
  • Object
show all
Defined in:
lib/active_job/plugins/resque/solo/inspector.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(only_args, except_args) ⇒ Inspector



7
8
9
10
11
12
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 7

def initialize(only_args, except_args)
  @only_args = only_args
  @except_args = except_args || []
  # always ignore the ActiveJob symbol hash key.
  @except_args << "_aj_symbol_keys" unless @except_args.include?("_aj_symbol_keys")
end

Class Method Details

.resque_present?Boolean



14
15
16
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 14

def self.resque_present?
  ActiveJob::Base.queue_adapter == ActiveJob::QueueAdapters::ResqueAdapter
end

Instance Method Details

#around_enqueue(job, block) ⇒ Object



18
19
20
21
22
23
24
25
26
27
28
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 18

def around_enqueue(job, block)
  if Inspector::resque_present?

    if !job_enqueued?(job) && !job_executing?(job)
      block.call
    end
  else
    # if resque is not present, always enqueue
    block.call
  end
end

#job(job) ⇒ Object



79
80
81
82
83
84
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 79

def job(job)
  job_arguments = ActiveJob::Arguments.serialize(job.arguments)
  job_arguments = job_args(job_arguments)
  job_class = job.class.name
  [ job_class, job_arguments ]
end

#job_args(args) ⇒ Object



86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 86

def job_args(args)
  if args.present?
    args.map do |arg|
      if arg.is_a? Hash
        arg.keep_if { |k,v| @only_args.include?(k.to_s) } if @only_args.present?
        arg.keep_if { |k,v| !@except_args.include?(k.to_s) } if @except_args.present?
      end

      arg
    end
  end

  args
end

#job_enqueued?(job) ⇒ Boolean



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 30

def job_enqueued?(job)
  size = ::Resque.size(job.queue_name)
  return false if size.zero?

  page_size = 250
  pages = (size/page_size).to_i + 1
  jobs = []

  # It's possible for this loop to skip jobs if they
  # are dequeued while the loop is in progress.
  (0..pages).each do |i|
    page_start = i * page_size
    page = ::Resque.peek(job.queue_name, page_start, page_size)
    break if page.empty?
    jobs += page
  end

  job_class, job_arguments = job(job)

  (jobs.size-1).downto(0) do |i|
    scheduled_job = jobs[i]
    return true if job_enqueued_with_args?(job_class, job_arguments, scheduled_job)
  end
  false
end

#job_enqueued_with_args?(job_class, job_arguments, scheduled_job) ⇒ Boolean



67
68
69
70
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 67

def job_enqueued_with_args?(job_class, job_arguments, scheduled_job)
  args = scheduled_job["args"][0]
  job_with_args_eq?(job_class, job_arguments, args)
end

#job_executing?(job) ⇒ Boolean



56
57
58
59
60
61
62
63
64
65
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 56

def job_executing?(job)
  job_class, job_arguments = job(job)

  ::Resque.workers.any? do |worker|
    processing = worker.processing
    next false if processing.blank?
    args = processing["payload"]["args"][0]
    job_with_args_eq?(job_class, job_arguments, args)
  end
end

#job_with_args_eq?(job_class, job_arguments, wrapper_args) ⇒ Boolean



72
73
74
75
76
77
# File 'lib/active_job/plugins/resque/solo/inspector.rb', line 72

def job_with_args_eq?(job_class, job_arguments, wrapper_args)
  return false if wrapper_args['job_class'] != job_class
  encoded_arguments = wrapper_args['arguments']
  encoded_arguments = job_args(encoded_arguments)
  encoded_arguments == job_arguments
end