Class: JobBoss::Boss

Inherits:
Object
  • Object
show all
Extended by:
ActiveSupport::Memoizable
Defined in:
lib/job_boss/boss.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Boss

Returns a new instance of Boss.



58
59
60
61
62
63
64
65
66
# File 'lib/job_boss/boss.rb', line 58

def initialize(options = {})
  config.application_root     ||= options[:working_dir]
  config.sleep_interval       ||= options[:sleep_interval]
  config.employee_limit       ||= options[:employee_limit]
  config.database_yaml_path   ||= options[:database_yaml_path]
  config.jobs_path            ||= options[:jobs_path]

  @running_jobs = []
end

Class Method Details

.configObject

Used to set Boss configuration Usage:

Boss.config.sleep_interval = 2


12
13
14
15
# File 'lib/job_boss/boss.rb', line 12

def config
  require 'job_boss/config'
  Config.new
end

.queue(attributes = {}) ⇒ Object

Used to queue jobs Usage:

Boss.queue.math.is_prime?(42)


21
22
23
24
# File 'lib/job_boss/boss.rb', line 21

def queue(attributes = {})
  require 'job_boss/queuer'
  Queuer.new(attributes)
end

.queue_path(path, *args) ⇒ Object

Used to queue jobs Usage:

Boss.queue_path('math#in_prime?', 42)


30
31
32
33
34
# File 'lib/job_boss/boss.rb', line 30

def queue_path(path, *args)
  controller, action = path.split('#')

  queue.send(controller).send(action, *args)
end

.resolve_path(path) ⇒ Object

If path starts with ‘/’, leave alone. Otherwise, prepend application_root



37
38
39
40
41
42
43
# File 'lib/job_boss/boss.rb', line 37

def resolve_path(path)
  if path == ?/ || path.match(/^#{config.application_root}/)
    path
  else
    File.join(config.application_root, path)
  end
end

Instance Method Details

#configObject



46
47
48
# File 'lib/job_boss/boss.rb', line 46

def config
  Boss.config
end

#dequeue_jobsObject

Dequeues next set of jobs based on prioritized round robin algorithm Priority of a particular queue determines how many jobs get pulled from that queue each time we dequeue A priority adjustment is also done to give greater priority to sets of jobs which have been running longer



127
128
129
130
131
132
133
134
135
136
# File 'lib/job_boss/boss.rb', line 127

def dequeue_jobs
  Job.pending.select('DISTINCT priority, path, batch_id').reorder(nil).collect do |distinct_job|
    queue_scope = Job.where(:path => distinct_job.path, :batch_id => distinct_job.batch_id)

    # Give queues which have are further along more priority to reduce latency
    priority_adjustment = ((queue_scope.completed.count.to_f / queue_scope.count) * config.employee_limit).floor

    queue_scope.pending.limit(distinct_job.priority + priority_adjustment)
  end.flatten.sort_by(&:id)
end

#loggerObject



50
51
52
53
54
55
# File 'lib/job_boss/boss.rb', line 50

def logger
  config.log_path = Boss.resolve_path(config.log_path)

  require 'logger'
  Logger.new(config.log_path)
end

#startObject

Start the boss



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/job_boss/boss.rb', line 69

def start
  require 'active_record'
  require 'yaml'

  establish_active_record_connection
  logger.info "Started ActiveRecord connection in '#{config.environment}' environment from database YAML: #{config.database_yaml_path}"

  require_job_classes

  Signal.trap("HUP") do
    stop
  end

  at_exit do
    stop if Process.pid == BOSS_PID
  end

  logger.info "Job Boss started"
  logger.info "Employee limit: #{Boss.config.employee_limit}"

  jobs = []
  while true
    available_employee_count = wait_for_available_employees

    if jobs.empty?
      jobs = wait_for_jobs
      jobs = jobs.to_a.select(&:mark_as_started)
    end

    [available_employee_count, jobs.size].min.times do
      job = jobs.shift
      job.dispatch(self)
      @running_jobs << job
    end

  end
end

#stopObject



138
139
140
141
142
143
144
# File 'lib/job_boss/boss.rb', line 138

def stop
  logger.info "Stopping #{@running_jobs.size} running employees..."

  shutdown_running_jobs

  logger.info "Job Boss stopped"
end

#wait_for_available_employeesObject

Waits until there is at least one available employee and then returns count



108
109
110
111
112
113
114
# File 'lib/job_boss/boss.rb', line 108

def wait_for_available_employees
  until (employee_count = available_employees) > 0
    sleep(config.sleep_interval)
  end

  employee_count
end

#wait_for_jobsObject



116
117
118
119
120
121
122
# File 'lib/job_boss/boss.rb', line 116

def wait_for_jobs
  while (jobs = dequeue_jobs).empty?
    sleep(config.sleep_interval)
  end

  jobs
end