Class: Isono::NodeModules::JobWorker
- Includes:
- Logger
- Defined in:
- lib/isono/node_modules/job_worker.rb
Defined Under Namespace
Classes: JobContext
Constant Summary collapse
- JOB_CTX_KEY =
:job_worker_ctx
Instance Attribute Summary
Attributes inherited from Base
Instance Method Summary collapse
-
#initialize(node, thread_pool = nil) ⇒ JobWorker
constructor
A new instance of JobWorker.
-
#run(run_cb = nil, fail_cb = nil, &blk) ⇒ Object
Run the block/proc.
-
#start { ... } ⇒ Object
Start a new long term job.
Methods included from Logger
Methods inherited from Base
#config_section, #manifest, #value_object
Constructor Details
#initialize(node, thread_pool = nil) ⇒ JobWorker
Returns a new instance of JobWorker.
34 35 36 37 38 |
# File 'lib/isono/node_modules/job_worker.rb', line 34 def initialize(node, thread_pool=nil) super(node) raise ArgumentError unless thread_pool.nil? || thread_pool.is_a?(ThreadPool) @thread_pool = thread_pool || @default_thread_pool end |
Instance Method Details
#run(run_cb = nil, fail_cb = nil, &blk) ⇒ Object
Run the block/proc. This is simple utility method for start().
107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/isono/node_modules/job_worker.rb', line 107 def run(run_cb=nil, fail_cb=nil, &blk) if run_cb.is_a?(Proc) start do |job| job.run_cb = run_cb job.fail_cb = fail_cb if fail_cb.is_a?(Proc) end elsif blk start do |job| job.run_cb = blk end end end |
#start { ... } ⇒ Object
Start a new long term job.
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/isono/node_modules/job_worker.rb', line 50 def start(&blk) job = JobContext.new() blk.call(job) @active_jobs[job.job_id] = job rpc = RpcChannel.new(node) rpc.request('job-collector', 'record', job.to_hash) { |req| req.oneshot = true } @thread_pool.pass { begin Thread.current[JOB_CTX_KEY]=job job.process_event(:on_start) rpc.request('job-collector', 'record', job.to_hash) { |req| req.oneshot = true } job.run_cb.call job.process_event(:on_done) rescue Exception => e job.process_event(:on_fail, e) if job.fail_cb begin job.fail_cb.arity == 1 ? job.fail_cb.call(e) : job.fail_cb.call rescue Exception => e logger.error("Failed to call the failure call back #{job.fail_cb}: #{job_id}: #{e}") logger.error(e) end end ensure Thread.current[JOB_CTX_KEY]=nil EventMachine.schedule { rpc.request('job-collector', 'record', job.to_hash) { |req| req.oneshot = true } @active_jobs.delete(job.job_id) } end } job end |