Module: Resque::Plugins::ConcurrentRestriction::Job

Defined in:
lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.extended(receiver) ⇒ Object



40
41
42
43
44
45
# File 'lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb', line 40

def self.extended(receiver)
   class << receiver
     alias reserve_without_restriction reserve
     alias reserve reserve_with_restriction
   end
end

Instance Method Details

#get_queued_job(queue) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb', line 69

def get_queued_job(queue)
  # Bounded retry
  1.upto(ConcurrentRestriction.reserve_queued_job_attempts) do |i|
    resque_job = reserve_without_restriction(queue)

    # Short-curcuit if a job was not found
    return if resque_job.nil?

    # If there is a job on regular queues, then only run it if its not restricted
    job_class = resque_job.payload_class
    job_args = resque_job.args

    # Return to work on job if not a restricted job
    return resque_job unless job_class.is_a?(ConcurrentRestriction)

    # Keep trying if job is restricted. If job is runnable, we keep the lock until
    # done_working
    return resque_job unless job_class.stash_if_restricted(resque_job)
  end

  # Safety net, here in case we hit the upper bound and there are still queued items
  return nil
end

#get_restricted_job(queue) ⇒ Object



62
63
64
65
66
67
# File 'lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb', line 62

def get_restricted_job(queue)
  # Try to find a runnable job from restriction queues
  # This also acquires a restriction lock, which is released in done_working
  resque_job = ConcurrentRestrictionJob.next_runnable_job(queue)
  return resque_job
end

#reserve_with_restriction(queue) ⇒ Object

Wrap reserve so we can move a job to restriction queue if it is restricted This needs to be a class method



49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/resque/plugins/concurrent_restriction/resque_worker_extension.rb', line 49

def reserve_with_restriction(queue)
  order = [:get_queued_job, :get_restricted_job]
  order.reverse! if ConcurrentRestriction.restricted_before_queued

  resque_job = nil
  order.each do |m|
    resque_job ||= self.send(m, queue)
  end

  # Return job or nil to move on to next queue if we couldn't get a job
  return resque_job
end