Module: Delayed::JobLauncher

Included in:
Worker
Defined in:
lib/delayed/job_launcher.rb

Constant Summary collapse

MAX_ACTIVE_JOBS =
50

Instance Method Summary collapse

Instance Method Details

#check_thread_sanityObject

Sanity check of dead threads for precaution, but probably won’t be necessary



83
84
85
86
87
88
89
90
# File 'lib/delayed/job_launcher.rb', line 83

def check_thread_sanity
  each_job_in_execution do |job, started_at, thread|
    unless thread.alive?
      log "Dead thread? Terminate it!, This should not be happening"
      thread.terminate
    end
  end
end

#initialize_launcher(max_active_jobs = MAX_ACTIVE_JOBS) ⇒ Object

Initialize the launcher, you can specified the maximun number of jobs executing in parallel, by default MAX_ACTIVE_JOBS constant

The launcher has a hash with the following structure: {}

|- id
|   `{}
|     |-:thread     => Thread
|     |-:job        => Delayed::Job
|     `-:started_at => Time
`-...

If group_by specified an ActiveRecord::Base object, id will be the primary key of those objects.



34
35
36
37
38
# File 'lib/delayed/job_launcher.rb', line 34

def initialize_launcher(max_active_jobs=MAX_ACTIVE_JOBS)
  @max_active_jobs = max_active_jobs
  @jobs = {}
  @mutex = Mutex.new
end

#jobs_ids_in_executionObject



97
98
99
100
101
# File 'lib/delayed/job_launcher.rb', line 97

def jobs_ids_in_execution
  ret = []
  each_job_in_execution {|job, x, xx| ret << job.id }
  ret
end

#jobs_in_executionObject

Number of jobs executing right now



93
94
95
# File 'lib/delayed/job_launcher.rb', line 93

def jobs_in_execution
  @jobs.size
end

#kill_threads!Object



103
104
105
106
107
108
# File 'lib/delayed/job_launcher.rb', line 103

def kill_threads!
  each_job_in_execution do |job, started_at, thread|
    log "Killing #{job.name}"
    thread.terminate
  end
end

#launch(job) ⇒ Object

Launch the job in a thread and register it. Returns whether the job has been launched or not.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/delayed/job_launcher.rb', line 42

def launch(job)
  return false unless can_execute job
  s = Semaphore.new
  t = Thread.new do
    log "##{Thread.current.object_id} before wait"
    s.wait
    log "###{Thread.current.object_id} after wait"
    begin
      job.run_with_lock Job::MAX_RUN_TIME, name
    rescue Exception => e
      log "ERROR: #{e}"
    ensure
      unregister_job job
    end
  end
  register_job job, t
  s.signal
  log "Launched job #{job.name}, there are #{jobs_in_execution} jobs in execution"
  return true
end

#report_jobs_stateObject

Print information about the current state to stdout



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/delayed/job_launcher.rb', line 64

def report_jobs_state
  if jobs_in_execution > 0
    margin = 20
    title = "Jobs In Execution"
    log "#{'='*margin} #{title} #{'='*margin} "
    log " There are #{jobs_in_execution} jobs running."
    each_job_in_execution do |job, started_at, thread|
      duration = Duration.new(Time.now - started_at)
      log "\tJob #{job.id}: #{job.name}"
      log "\t   Running on #{thread} (#{thread.status}) for #{duration}"
    end
    log "#{'=' * (margin * 2 + title.size + 2)} "
  else
    log "\n\tThere is no jobs in execution right now!"
  end
end