Class: ScaleWorkers::Adapter::DelayedJobActiveRecord

Inherits:
Object
  • Object
show all
Includes:
Utils
Defined in:
lib/scale_workers/adapter/delayed_job_active_record.rb

Defined Under Namespace

Modules: Command

Constant Summary collapse

PRIME =
97
TIMEOUT =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils

#say

Constructor Details

#initializeDelayedJobActiveRecord

Returns a new instance of DelayedJobActiveRecord.



18
19
20
# File 'lib/scale_workers/adapter/delayed_job_active_record.rb', line 18

def initialize
  @pid_switch = 0
end

Instance Attribute Details

#pid_switchObject

Returns the value of attribute pid_switch.



16
17
18
# File 'lib/scale_workers/adapter/delayed_job_active_record.rb', line 16

def pid_switch
  @pid_switch
end

Instance Method Details

#count_procedureObject



42
43
44
# File 'lib/scale_workers/adapter/delayed_job_active_record.rb', line 42

def count_procedure
  lambda {|queue, max_failure| Delayed::Job.where(queue: queue).where("attempts <= ?", max_failure).count}
end

#dj_call(command, queue, pid_dir, count) ⇒ Object



54
55
56
57
# File 'lib/scale_workers/adapter/delayed_job_active_record.rb', line 54

def dj_call(command, queue, pid_dir, count)
  say('DJ call')
  `cd #{Rails.root.to_s}; RAILS_ENV=#{Rails.env} #{::ScaleWorkers.configuration.worker_executable_path} --queue='#{queue}' -p'#{queue}' --pid-dir=#{pid_dir} -n#{count} #{command}`
end

#increment_pid_switchObject



46
47
48
# File 'lib/scale_workers/adapter/delayed_job_active_record.rb', line 46

def increment_pid_switch
  self.pid_switch = (self.pid_switch + 1) % PRIME
end

#pid_dir(queue) ⇒ Object



50
51
52
# File 'lib/scale_workers/adapter/delayed_job_active_record.rb', line 50

def pid_dir(queue)
  File.join(Rails.root.to_s, "tmp", "pids", "#{queue}.#{self.pid_switch}")
end

#start_procedureObject



32
33
34
35
36
37
38
39
40
# File 'lib/scale_workers/adapter/delayed_job_active_record.rb', line 32

def start_procedure
  lambda do |queue, count|
    say('Inside start procedure')
    adapter = self #::ScaleWorkers.configuration.adapter
    adapter.increment_pid_switch
    pid_dir = adapter.pid_dir(queue)
    adapter.dj_call(adapter.class::Command::START, queue, pid_dir, count)
  end
end

#stop_procedureObject



22
23
24
25
26
27
28
29
30
# File 'lib/scale_workers/adapter/delayed_job_active_record.rb', line 22

def stop_procedure
  lambda do |queue, count|
    say('Inside stop procedure')
    adapter = self#::ScaleWorkers.configuration.adapter
    pid_dir = adapter.pid_dir(queue)
    adapter.dj_call(adapter.class::Command::STOP, queue, pid_dir, count)
    sleep(adapter.class::TIMEOUT)
  end
end