Class: JobBoss::Boss
- Inherits:
-
Object
- Object
- JobBoss::Boss
- Extended by:
- ActiveSupport::Memoizable
- Defined in:
- lib/job_boss/boss.rb
Class Method Summary collapse
-
.config ⇒ Object
Used to set Boss configuration Usage: Boss.config.sleep_interval = 2.
-
.queue(attributes = {}) ⇒ Object
Used to queue jobs Usage: Boss.queue.math.is_prime?(42).
-
.queue_path(path, *args) ⇒ Object
Used to queue jobs Usage: Boss.queue_path(‘math#in_prime?’, 42).
-
.resolve_path(path) ⇒ Object
If path starts with ‘/’, leave alone.
Instance Method Summary collapse
- #config ⇒ Object
-
#dequeue_jobs ⇒ Object
Dequeues next set of jobs based on prioritized round robin algorithm Priority of a particular queue determines how many jobs get pulled from that queue each time we dequeue A priority adjustment is also done to give greater priority to sets of jobs which have been running longer.
-
#initialize(options = {}) ⇒ Boss
constructor
A new instance of Boss.
- #logger ⇒ Object
-
#start ⇒ Object
Start the boss.
- #stop ⇒ Object
-
#wait_for_available_employees ⇒ Object
Waits until there is at least one available employee and then returns count.
- #wait_for_jobs ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Boss
Returns a new instance of Boss.
58 59 60 61 62 63 64 65 66 |
# File 'lib/job_boss/boss.rb', line 58 def initialize( = {}) config.application_root ||= [:working_dir] config.sleep_interval ||= [:sleep_interval] config.employee_limit ||= [:employee_limit] config.database_yaml_path ||= [:database_yaml_path] config.jobs_path ||= [:jobs_path] @running_jobs = [] end |
Class Method Details
.config ⇒ Object
Used to set Boss configuration Usage:
Boss.config.sleep_interval = 2
12 13 14 15 |
# File 'lib/job_boss/boss.rb', line 12 def config require 'job_boss/config' Config.new end |
.queue(attributes = {}) ⇒ Object
Used to queue jobs Usage:
Boss.queue.math.is_prime?(42)
21 22 23 24 |
# File 'lib/job_boss/boss.rb', line 21 def queue(attributes = {}) require 'job_boss/queuer' Queuer.new(attributes) end |
.queue_path(path, *args) ⇒ Object
Used to queue jobs Usage:
Boss.queue_path('math#in_prime?', 42)
30 31 32 33 34 |
# File 'lib/job_boss/boss.rb', line 30 def queue_path(path, *args) controller, action = path.split('#') queue.send(controller).send(action, *args) end |
.resolve_path(path) ⇒ Object
If path starts with ‘/’, leave alone. Otherwise, prepend application_root
37 38 39 40 41 42 43 |
# File 'lib/job_boss/boss.rb', line 37 def resolve_path(path) if path == ?/ || path.match(/^#{config.application_root}/) path else File.join(config.application_root, path) end end |
Instance Method Details
#dequeue_jobs ⇒ Object
Dequeues next set of jobs based on prioritized round robin algorithm Priority of a particular queue determines how many jobs get pulled from that queue each time we dequeue A priority adjustment is also done to give greater priority to sets of jobs which have been running longer
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/job_boss/boss.rb', line 127 def dequeue_jobs Job.pending.select('DISTINCT priority, path, batch_id').reorder(nil).collect do |distinct_job| queue_scope = Job.where(:path => distinct_job.path, :batch_id => distinct_job.batch_id) # Give queues which have are further along more priority to reduce latency priority_adjustment = ((queue_scope.completed.count.to_f / queue_scope.count) * config.employee_limit).floor queue_scope.pending.limit(distinct_job.priority + priority_adjustment) end.flatten.sort_by(&:id) end |
#logger ⇒ Object
50 51 52 53 54 55 |
# File 'lib/job_boss/boss.rb', line 50 def logger config.log_path = Boss.resolve_path(config.log_path) require 'logger' Logger.new(config.log_path) end |
#start ⇒ Object
Start the boss
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/job_boss/boss.rb', line 69 def start require 'active_record' require 'yaml' establish_active_record_connection logger.info "Started ActiveRecord connection in '#{config.environment}' environment from database YAML: #{config.database_yaml_path}" require_job_classes Signal.trap("HUP") do stop end at_exit do stop if Process.pid == BOSS_PID end logger.info "Job Boss started" logger.info "Employee limit: #{Boss.config.employee_limit}" jobs = [] while true available_employee_count = wait_for_available_employees if jobs.empty? jobs = wait_for_jobs jobs = jobs.to_a.select(&:mark_as_started) end [available_employee_count, jobs.size].min.times do job = jobs.shift job.dispatch(self) @running_jobs << job end end end |
#stop ⇒ Object
138 139 140 141 142 143 144 |
# File 'lib/job_boss/boss.rb', line 138 def stop logger.info "Stopping #{@running_jobs.size} running employees..." shutdown_running_jobs logger.info "Job Boss stopped" end |
#wait_for_available_employees ⇒ Object
Waits until there is at least one available employee and then returns count
108 109 110 111 112 113 114 |
# File 'lib/job_boss/boss.rb', line 108 def wait_for_available_employees until (employee_count = available_employees) > 0 sleep(config.sleep_interval) end employee_count end |
#wait_for_jobs ⇒ Object
116 117 118 119 120 121 122 |
# File 'lib/job_boss/boss.rb', line 116 def wait_for_jobs while (jobs = dequeue_jobs).empty? sleep(config.sleep_interval) end jobs end |