Class: Komrade::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/komrade-client/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Worker

Returns a new instance of Worker.



7
8
9
# File 'lib/komrade-client/worker.rb', line 7

def initialize(args={})
  @running = true
end

Instance Method Details

#call(payload) ⇒ Object

Each job includes a method column. We will use ruby’s eval to grab the ruby object from memory. We send the method to the object and pass the args.



64
65
66
67
68
69
# File 'lib/komrade-client/worker.rb', line 64

def call(payload)
  args = payload["args"]
  klass = eval(payload["method"].split(".").first)
  message = payload["method"].split(".").last
  klass.send(message, *args)
end

#handle_failure(job, e) ⇒ Object

This method will be called when an exception is raised during the execution of the job.



73
74
75
76
77
78
# File 'lib/komrade-client/worker.rb', line 73

def handle_failure(job,e)
  fid = SecureRandom.uuid
  log(:at => "handle-failure", :id => job['id'], 'failure-id' => fid)
  b = {error: e.class, message: e.message}
  HttpHelpers.put("/jobs/#{job['id']}/failures/#{fid}", b)
end

#log(data, &blk) ⇒ Object



80
81
82
# File 'lib/komrade-client/worker.rb', line 80

def log(data, &blk)
  Komrade.log(data, &blk)
end

#startObject

Start a loop and work jobs indefinitely. Call this method to start the worker. This is the easiest way to start working jobs.



14
15
16
# File 'lib/komrade-client/worker.rb', line 14

def start
  work while @running
end

#stopObject

Call this method to stop the worker. The worker may not stop immediately if the worker is sleeping.



21
22
23
# File 'lib/komrade-client/worker.rb', line 21

def stop
  @running = false
end

#workObject

This method will lock a job & evaluate the code defined by the job. Also, this method will make the best attempt to delete the job from the queue before returning.

Before the worker evaluates the code extracted from the job, it spawns a thread which will send heartbeats to komrade. This indicates to the back end that the job is being processed. If heartbeats stop coming in for a job, komrade may thing that the job is lost and subsequently release the lock and place it back in the queue.



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/komrade-client/worker.rb', line 34

def work
  jobs = Queue.dequeue
  until jobs.empty?
    job = jobs.pop
    begin
      log(:at => "work-job", :id => job['id']) do
        @finished, @beats = false, 0
        Thread.new do
          while @beats == 0 || !@finished
            @beats += 1
            log(:at => "heartbeat-job", :id => job['id'])
            HttpHelpers.post("/jobs/#{job['id']}/heartbeats")
            sleep(1)
          end
        end
        call(job["payload"])
        @finished = true
      end
    rescue => e
      handle_failure(job, e)
      raise(e)
    ensure
      Queue.remove(job["id"])
    end
  end
end