Class: Woodhouse::Runners::FileRunner
- Inherits:
-
Woodhouse::Runner
- Object
- Woodhouse::Runner
- Woodhouse::Runners::FileRunner
- Defined in:
- lib/woodhouse/runners/file_runner.rb
Constant Summary collapse
- DEFAULT_QUEUE_DIR =
'/tmp/woodhouse/queue'
Instance Attribute Summary collapse
-
#jobs_dir ⇒ Object
Returns the value of attribute jobs_dir.
-
#queue_dir ⇒ Object
Returns the value of attribute queue_dir.
Instance Method Summary collapse
- #each_job(&block) ⇒ Object
-
#initialize(worker, config) ⇒ FileRunner
constructor
A new instance of FileRunner.
- #reserve_job(queue_id, &block) ⇒ Object
- #service_jobs ⇒ Object
- #spin_down ⇒ Object
- #subscribe ⇒ Object
Methods inherited from Woodhouse::Runner
Constructor Details
#initialize(worker, config) ⇒ FileRunner
8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/woodhouse/runners/file_runner.rb', line 8 def initialize(worker, config) super server_info = config.server_info || {} self.queue_dir = server_info[:path] || DEFAULT_QUEUE_DIR self.jobs_dir = "#{queue_dir}/jobs" unless File.directory?(jobs_dir) # subdirectory of queue_dir config.logger.debug "[Woodhouse initialize] Creating queue directory '#{queue_dir}'" FileUtils.mkdir_p jobs_dir end end |
Instance Attribute Details
#jobs_dir ⇒ Object
Returns the value of attribute jobs_dir.
4 5 6 |
# File 'lib/woodhouse/runners/file_runner.rb', line 4 def jobs_dir @jobs_dir end |
#queue_dir ⇒ Object
Returns the value of attribute queue_dir.
4 5 6 |
# File 'lib/woodhouse/runners/file_runner.rb', line 4 def queue_dir @queue_dir end |
Instance Method Details
#each_job(&block) ⇒ Object
41 42 43 44 45 46 47 48 49 50 |
# File 'lib/woodhouse/runners/file_runner.rb', line 41 def each_job(&block) queue = Dir["#{queue_dir}/j-*"].sort queue.each do |job_path| job = YAML.load(File.read(job_path)) queue_id = File.basename(job_path)[2..-1] yield(job, queue_id) end end |
#reserve_job(queue_id, &block) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/woodhouse/runners/file_runner.rb', line 52 def reserve_job(queue_id, &block) enqueued = "#{queue_dir}/j-#{queue_id}" processing = "#{queue_dir}/p-#{queue_id}" failed = "#{queue_dir}/f-#{queue_id}" begin FileUtils.mv(enqueued, processing) if yield # Success, clean up File.unlink(processing) end rescue Errno::ENOENT # Another worker beat us to the job false rescue => err # Woodhouse internal error occurred during processing File.open(processing, 'a') {|f| f.write YAML.dump(err) } raise ensure # If file still hanging around then it failed FileUtils.mv(processing, failed) if File.exists?(processing) end end |
#service_jobs ⇒ Object
33 34 35 36 37 38 39 |
# File 'lib/woodhouse/runners/file_runner.rb', line 33 def service_jobs each_job do |job,queue_id| if can_service_job?(job) reserve_job(queue_id) { service_job(job) } end end end |
#spin_down ⇒ Object
28 29 30 31 |
# File 'lib/woodhouse/runners/file_runner.rb', line 28 def spin_down @shutdown = true signal :spin_down end |
#subscribe ⇒ Object
21 22 23 24 25 26 |
# File 'lib/woodhouse/runners/file_runner.rb', line 21 def subscribe until @shutdown do service_jobs sleep 5 end end |