Class: Woodhouse::Runners::FileRunner

Inherits:
Woodhouse::Runner show all
Defined in:
lib/woodhouse/runners/file_runner.rb

Constant Summary collapse

DEFAULT_QUEUE_DIR =
'/tmp/woodhouse/queue'

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods inherited from Woodhouse::Runner

#current_status

Constructor Details

#initialize(worker, config) ⇒ FileRunner



8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/woodhouse/runners/file_runner.rb', line 8

def initialize(worker, config)
  super

  server_info = config.server_info || {}
  self.queue_dir = server_info[:path] || DEFAULT_QUEUE_DIR
  self.jobs_dir = "#{queue_dir}/jobs"

  unless File.directory?(jobs_dir) # subdirectory of queue_dir
    config.logger.debug "[Woodhouse initialize] Creating queue directory '#{queue_dir}'"
    FileUtils.mkdir_p jobs_dir
  end
end

Instance Attribute Details

#jobs_dirObject

Returns the value of attribute jobs_dir.



4
5
6
# File 'lib/woodhouse/runners/file_runner.rb', line 4

def jobs_dir
  @jobs_dir
end

#queue_dirObject

Returns the value of attribute queue_dir.



4
5
6
# File 'lib/woodhouse/runners/file_runner.rb', line 4

def queue_dir
  @queue_dir
end

Instance Method Details

#each_job(&block) ⇒ Object



41
42
43
44
45
46
47
48
49
50
# File 'lib/woodhouse/runners/file_runner.rb', line 41

def each_job(&block)
  queue = Dir["#{queue_dir}/j-*"].sort

  queue.each do |job_path|
    job = YAML.load(File.read(job_path))
    queue_id = File.basename(job_path)[2..-1]

    yield(job, queue_id)
  end
end

#reserve_job(queue_id, &block) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/woodhouse/runners/file_runner.rb', line 52

def reserve_job(queue_id, &block)
  enqueued   = "#{queue_dir}/j-#{queue_id}"
  processing = "#{queue_dir}/p-#{queue_id}"
  failed     = "#{queue_dir}/f-#{queue_id}"

  begin
    FileUtils.mv(enqueued, processing)

    if yield
      # Success, clean up
      File.unlink(processing)
    end

  rescue Errno::ENOENT
    # Another worker beat us to the job
    false

  rescue => err
    # Woodhouse internal error occurred during processing
    File.open(processing, 'a') {|f| f.write YAML.dump(err) }
    raise

  ensure
    # If file still hanging around then it failed
    FileUtils.mv(processing, failed) if File.exists?(processing)
  end
end

#service_jobsObject



33
34
35
36
37
38
39
# File 'lib/woodhouse/runners/file_runner.rb', line 33

def service_jobs
  each_job do |job,queue_id|
    if can_service_job?(job)
      reserve_job(queue_id) { service_job(job) }
    end
  end
end

#spin_downObject



28
29
30
31
# File 'lib/woodhouse/runners/file_runner.rb', line 28

def spin_down
  @shutdown = true
  signal :spin_down
end

#subscribeObject



21
22
23
24
25
26
# File 'lib/woodhouse/runners/file_runner.rb', line 21

def subscribe
  until @shutdown do
    service_jobs
    sleep 5
  end
end