Module: Delayed::JobLauncher
- Included in:
- Worker
- Defined in:
- lib/delayed/job_launcher.rb
Constant Summary collapse
- MAX_ACTIVE_JOBS =
50
Instance Method Summary collapse
-
#check_thread_sanity ⇒ Object
Sanity check of dead threads for precaution, but probably won’t be necessary.
-
#initialize_launcher(max_active_jobs = MAX_ACTIVE_JOBS) ⇒ Object
Initialize the launcher, you can specified the maximun number of jobs executing in parallel, by default MAX_ACTIVE_JOBS constant.
- #jobs_ids_in_execution ⇒ Object
-
#jobs_in_execution ⇒ Object
Number of jobs executing right now.
- #kill_threads! ⇒ Object
-
#launch(job) ⇒ Object
Launch the job in a thread and register it.
-
#report_jobs_state ⇒ Object
Print information about the current state to stdout.
Instance Method Details
#check_thread_sanity ⇒ Object
Sanity check of dead threads for precaution, but probably won’t be necessary
83 84 85 86 87 88 89 90 |
# File 'lib/delayed/job_launcher.rb', line 83 def check_thread_sanity each_job_in_execution do |job, started_at, thread| unless thread.alive? log "Dead thread? Terminate it!, This should not be happening" thread.terminate end end end |
#initialize_launcher(max_active_jobs = MAX_ACTIVE_JOBS) ⇒ Object
Initialize the launcher, you can specified the maximun number of jobs executing in parallel, by default MAX_ACTIVE_JOBS constant
The launcher has a hash with the following structure: {}
|- id
| `{}
| |-:thread => Thread
| |-:job => Delayed::Job
| `-:started_at => Time
`-...
If group_by specified an ActiveRecord::Base object, id will be the primary key of those objects.
34 35 36 37 38 |
# File 'lib/delayed/job_launcher.rb', line 34 def initialize_launcher(max_active_jobs=MAX_ACTIVE_JOBS) @max_active_jobs = max_active_jobs @jobs = {} @mutex = Mutex.new end |
#jobs_ids_in_execution ⇒ Object
97 98 99 100 101 |
# File 'lib/delayed/job_launcher.rb', line 97 def jobs_ids_in_execution ret = [] each_job_in_execution {|job, x, xx| ret << job.id } ret end |
#jobs_in_execution ⇒ Object
Number of jobs executing right now
93 94 95 |
# File 'lib/delayed/job_launcher.rb', line 93 def jobs_in_execution @jobs.size end |
#kill_threads! ⇒ Object
103 104 105 106 107 108 |
# File 'lib/delayed/job_launcher.rb', line 103 def kill_threads! each_job_in_execution do |job, started_at, thread| log "Killing #{job.name}" thread.terminate end end |
#launch(job) ⇒ Object
Launch the job in a thread and register it. Returns whether the job has been launched or not.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/delayed/job_launcher.rb', line 42 def launch(job) return false unless can_execute job s = Semaphore.new t = Thread.new do log "##{Thread.current.object_id} before wait" s.wait log "###{Thread.current.object_id} after wait" begin job.run_with_lock Job::MAX_RUN_TIME, name rescue Exception => e log "ERROR: #{e}" ensure unregister_job job end end register_job job, t s.signal log "Launched job #{job.name}, there are #{jobs_in_execution} jobs in execution" return true end |
#report_jobs_state ⇒ Object
Print information about the current state to stdout
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/delayed/job_launcher.rb', line 64 def report_jobs_state if jobs_in_execution > 0 margin = 20 title = "Jobs In Execution" log "#{'='*margin} #{title} #{'='*margin} " log " There are #{jobs_in_execution} jobs running." each_job_in_execution do |job, started_at, thread| duration = Duration.new(Time.now - started_at) log "\tJob #{job.id}: #{job.name}" log "\t Running on #{thread} (#{thread.status}) for #{duration}" end log "#{'=' * (margin * 2 + title.size + 2)} " else log "\n\tThere is no jobs in execution right now!" end end |