Class: Komrade::Worker
- Inherits:
-
Object
- Object
- Komrade::Worker
- Defined in:
- lib/komrade-client/worker.rb
Instance Method Summary collapse
-
#call(payload) ⇒ Object
Each job includes a method column.
-
#handle_failure(job, e) ⇒ Object
This method will be called when an exception is raised during the execution of the job.
-
#initialize(args = {}) ⇒ Worker
constructor
A new instance of Worker.
- #log(data, &blk) ⇒ Object
-
#start ⇒ Object
Start a loop and work jobs indefinitely.
-
#stop ⇒ Object
Call this method to stop the worker.
-
#work ⇒ Object
This method will lock a job & evaluate the code defined by the job.
Constructor Details
#initialize(args = {}) ⇒ Worker
Returns a new instance of Worker.
7 8 9 |
# File 'lib/komrade-client/worker.rb', line 7 def initialize(args={}) @running = true end |
Instance Method Details
#call(payload) ⇒ Object
Each job includes a method column. We will use ruby’s eval to grab the ruby object from memory. We send the method to the object and pass the args.
64 65 66 67 68 69 |
# File 'lib/komrade-client/worker.rb', line 64 def call(payload) args = payload["args"] klass = eval(payload["method"].split(".").first) = payload["method"].split(".").last klass.send(, *args) end |
#handle_failure(job, e) ⇒ Object
This method will be called when an exception is raised during the execution of the job.
73 74 75 76 77 78 |
# File 'lib/komrade-client/worker.rb', line 73 def handle_failure(job,e) fid = SecureRandom.uuid log(:at => "handle-failure", :id => job['id'], 'failure-id' => fid) b = {error: e.class, message: e.} HttpHelpers.put("/jobs/#{job['id']}/failures/#{fid}", b) end |
#log(data, &blk) ⇒ Object
80 81 82 |
# File 'lib/komrade-client/worker.rb', line 80 def log(data, &blk) Komrade.log(data, &blk) end |
#start ⇒ Object
Start a loop and work jobs indefinitely. Call this method to start the worker. This is the easiest way to start working jobs.
14 15 16 |
# File 'lib/komrade-client/worker.rb', line 14 def start work while @running end |
#stop ⇒ Object
Call this method to stop the worker. The worker may not stop immediately if the worker is sleeping.
21 22 23 |
# File 'lib/komrade-client/worker.rb', line 21 def stop @running = false end |
#work ⇒ Object
This method will lock a job & evaluate the code defined by the job. Also, this method will make the best attempt to delete the job from the queue before returning.
Before the worker evaluates the code extracted from the job, it spawns a thread which will send heartbeats to komrade. This indicates to the back end that the job is being processed. If heartbeats stop coming in for a job, komrade may thing that the job is lost and subsequently release the lock and place it back in the queue.
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/komrade-client/worker.rb', line 34 def work jobs = Queue.dequeue until jobs.empty? job = jobs.pop begin log(:at => "work-job", :id => job['id']) do @finished, @beats = false, 0 Thread.new do while @beats == 0 || !@finished @beats += 1 log(:at => "heartbeat-job", :id => job['id']) HttpHelpers.post("/jobs/#{job['id']}/heartbeats") sleep(1) end end call(job["payload"]) @finished = true end rescue => e handle_failure(job, e) raise(e) ensure Queue.remove(job["id"]) end end end |