Class: Isono::NodeModules::JobWorker
- Includes:
- Logger
- Defined in:
- lib/isono/node_modules/job_worker.rb
Defined Under Namespace
Classes: JobContext
Constant Summary collapse
- JOB_CTX_KEY =
:job_worker_ctx
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#run(run_cb = nil, fail_cb = nil, &blk) ⇒ Object
Run the block/proc.
-
#start { ... } ⇒ Object
Start a new long term job.
Methods included from Logger
Methods inherited from Base
#config_section, #initialize, #manifest, #value_object
Constructor Details
This class inherits a constructor from Isono::NodeModules::Base
Instance Method Details
#run(run_cb = nil, fail_cb = nil, &blk) ⇒ Object
Run the block/proc. This is simple utility method for start().
96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/isono/node_modules/job_worker.rb', line 96 def run(run_cb=nil, fail_cb=nil, &blk) if run_cb.is_a?(Proc) start do |job| job.run_cb = run_cb job.fail_cb = fail_cb if fail_cb.is_a?(Proc) end elsif blk start do |job| job.run_cb = blk end end end |
#start { ... } ⇒ Object
Start a new long term job.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/isono/node_modules/job_worker.rb', line 44 def start(&blk) job = JobContext.new() blk.call(job) @active_jobs[job.job_id] = job rpc = RpcChannel.new(node) rpc.request('job-collector', 'regist', job.to_hash) { |req| req.oneshot = true } @thread_pool.pass { begin Thread.current[JOB_CTX_KEY]=job job.stm.on_start rpc.request('job-collector', 'update', job.to_hash) { |req| req.oneshot = true } job.run_cb.call job.stm.on_done rescue Exception => e job.stm.on_fail(e) if job.fail_cb job.fail_cb.arity == 1 ? job.fail_cb.call(e) : job.fail_cb.call end ensure Thread.current[JOB_CTX_KEY]=nil EventMachine.schedule { rpc.request('job-collector', 'update', job.to_hash) { |req| req.oneshot = true } @active_jobs.delete(job.job_id) } end } job end |